Thursday, October 5, 2023

Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels

In the last article, "Don't Use 'Task.WhenAll' for Interdependent Tasks", we saw how using WhenAll for tasks that are interdependent can cause an application to hang if one of the tasks throws an exception. As a solution, we changed from using "Task.WhenAll" to awaiting the tasks separately. But there are other approaches.

In this article, we will focus on breaking the dependency between the tasks. In the sample code, the channel is bounded (meaning, it has a maximum capacity). This can create a dependency between the Producer and Consumer. One way to break the dependency is to use an unbounded channel.

A Bounded Channel can create a dependency between a Producer and Consumer. Using an Unbounded Channel is one way to break that dependency.

Let's do a quick review of the code that we saw yesterday and then look at how changing the channel can affect the code.

Motivation

This article is based on a question sent in by Edington Watt regarding a presentation that I did about Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed strange behavior when exceptions are thrown, and he asked the question "What are the best exception handling practices in the Consumer Producer approach using Channel<T>?"

One reason for the behavior was the way that a bounded channel created a dependency between the Producer and Consumer. If we can break that dependency, the behavior will change.

Note: The full source code (including the original sample code and various approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.

Articles

Interdependent Tasks

We'll go back to the starting place we used in the previous article. Here are the relevant methods (available in the Program.cs file of the "refactored" project):


    static async Task Main(string[] args)
    {
        try
        {
            await ProducerConsumerWithExceptions();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

        Console.WriteLine("Done");
    }

    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

I won't go through this code again here. Check the previous article for a walkthrough. This code works fine for the success state, but it has strange behavior if the Consumer throws an exception.

Consumer Exception

If the Consumer throws an exception while processing an item, the entire application hangs:


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11

Here's a reminder of what happens:

In the "ProducerConsumerWithExceptions" method, we create a Bounded Channel. This means that the capacity of the channel is limited to 10 items. When the channel has reached that capacity, no new items can be written to it until space is made available for them.

The Consumer reads one item off of the channel (index 0) and then throws an exception. The Consumer then stops, and no further items are read off of the channel.

The Producer produces data and puts it onto the channel. The first items produced (index 0) is pulled off of the channel by the Consumer. The Producer keeps going and puts 10 more items onto the channel (indexes 1 through 10).

Then the Producer produces index 11 and tries to put it onto the channel. At this point, the channel is at capacity (it has 10 items already), so the Producer waits (in an async-friendly way) for there to be space to write the next item.

And this is where the application hangs. The Producer will wait forever. Since "Task.WhenAll" is waiting for the Producer and Consumer to both finish, it will also wait forever. The application will not exit on its own.

Using an Unbounded Channel

What I would like to do is remove the dependency between the Producer and Consumer. Even if the Consumer breaks, the Producer should keep going. One way of doing this is to use an Unbounded Channel (meaning, a channel with no capacity limit).

This is a small change to this application in the ProducerConsumerWithExceptions method. (This code is in the Program.cs file of the "unbounded-channel" project):


    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateUnbounded<int>();

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

Now the Channel is no longer limited to 10 items.

Error Output

Let's take a look at the error output now:


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11
Producing something: 12
Producing something: 13
Producing something: 14
Producing something: 15
Producing something: 16
Producing something: 17
Producing something: 18
Producing something: 19
Bad thing happened in Consumer (0)
Done

The most important part of this output is that we have the "Done" message. This tells us that the application finished on its own. We also see the error message "Bad thing happened in Consumer (0)" just before the "Done".

Here's how things work with this code.

In the "ProducerConsumerWithExceptions" method, we create a Unbounded Channel. This means that the capacity of the channel is not limited.

The Consumer reads one item off of the channel (index 0) and then throws an exception. The Consumer then stops, and no further items are read off of the channel.

The Producer produces data and puts it onto the channel. The first items produced (index 0) is pulled off of the channel by the Consumer. The Producer keeps going and puts 19 more items onto the channel (indexes 1 through 19). Once it has produced all 20 items, the Producer completes.

The Producer Task has completed (successfully) and the Consumer Task has completed (with error). This means that "await Task.WhenAll" can move foward.

Since one of the tasks got an exception, the "await" will raise that exception here. But "ProducerConsumerWithException" does not handle the exception, so it bubbles up to the "Main" method.

The "Main" method has a try/catch block. It catches the thrown exception, prints the error message to the console, outputs "Done", and then the application exits.

Bounded vs. Unbounded Channels

What we've seen here is that using a Bounded Channel can create a dependency between a Producer and Consumer. If the Consumer fails and the Channel reaches capacity, the Producer will never complete.

With an Unbounded Channel, we no longer have that same dependency.

I say that a Bounded Channel "can" create a dependency because it doesn't necessarily create a dependency. There are different ways to write to a Channel that would not create a dependency.

Writing to a Channel

There are multiple ways of writing to a channel. In this code, the Producer uses "WriteAsync":
    await writer.WriteAsync(i);

"WriteAsync" is a method on the ChannelWriter<T> class. If the channel is at capacity, it will wait until there is space available. This is the easiest way to write to a channel, so I tend to use this most of the time. It does have the downside of possibly waiting forever if space is never made available.

But there are other approaches. The ChannelWriter<T> also has a "TryWrite" method:
    bool success = writer.TryWrite(i);

"TryWrite" is not asynchronous. If it writes to the channel successfully, it returns "true". But if the channel is at capacity and the data cannot be written, it returns "false".

This has the advantage of not waiting forever. The disadvantage is that we need a little more code to handle the workflow. What happens if "TryWrite" returns "false"? Do we wait a bit and retry? Do we pause our Producer? Do we simply discard that item? Do we log the item and go to the next one? It depends on what our application needs to do. But the flexibility is there for us to decide.

To help us with the workflow, there is also a "WaitToWriteAsync" method:
    await writer.WaitToWriteAsync();

If the channel is at capacity, this will wait (in an async-friendly way) for space to be available. We can use this to be pretty sure that "TryWrite" will succeed. (But with parallel code, it might not always be the case.)

When we await "WaitToWriteAsync", however, we are back to a situation where we may end up waiting forever if the channel is at capacity and no new items are ready off of it.

Cancellation Tokens

Another part of the async methods on ChannelWriter<T> is that they take optional cancellation tokens.
    await writer.WriteAsync(i, cancellationToken);

We could be pretty creative with a cancellation token. For example, if the Consumer throws an exception, it could also set the cancellation token to the "cancellation requested" state. Then "WriteAsync" would stop waiting.

The same is true if we use a cancellation token with "WaitToWriteAsync".

A Lot of Options

As with many things in C#, Channel<T> has a lot of options. This can be good because of the flexibility that we have as programmers. If we need to do something a little unusual, there are often methods and properties that help us out. The downside is that there is a bit more to learn, and it can be confusing to figure out which approach is the best one for your application.

More Solutions

So we've seen a couple of solutions to fix the weird behavior of the sample application when exceptions are thrown.

If we await the Producer and Consumer tasks separately, we do not need to be as concerned about them being dependent on one another.

If we change the Channel from Bounded to Unbounded, we remove the dependency between the Producer and Consumer.

If we add some exception handling inside the Producer and Consumer Tasks, we can continue processing data even if there are errors. (This approach is shown in the "doesnt-stop" project in the GitHub repository: https://github.com/jeremybytes/channel-exceptions.) We'll look at this in a future article.

Wrap Up

As we dive deeper into different ways of using asynchronous and parallel bits in our code, we often find strange behavior. And sometimes it can take a while to figure out where the strangeness is coming from. But the more that we learn, the easier it gets. So keep learning and keep writing amazing applications that make your users happy.

Happy Coding!

No comments:

Post a Comment