Friday, October 6, 2023

Producer/Consumer Exception Handling - A More Reliable Approach

In the last 2 articles, we looked at a couple of ways to deal with exceptions that can happen when using the Producer/Consumer pattern along with Channel<T> in C#. (Prior articles: "Don't Use 'Task.WhenAll' for Interdependent Tasks" and "Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels".)

Both of these approaches are short-circuiting, meaning that if the Consumer fails on one item, no further items are processed. The same is true of the Producer: if the Producer fails on one item, no further items are produced. This approach may work fine for particular applications. But what if we want something more robust?
By handling exceptions inside the Producer and Consumer, errors can be logged and processing can continue.
We can go back and look at the logged items to determine which items need to be redone. This is a more robust approach to exception handling for our sample application.

Motivation

This article is based on a question sent in by Edington Watt regarding a presentation that I did about Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed strange behavior when exceptions are thrown, and he asked the question "What are the best exception handling practices in the Consumer Producer approach using Channel<T>?"

Today we'll look at handling exceptions in a way that does not short-circuit the process.

Note: The full source code (including the original sample code and various approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.

Articles

Interdependent Tasks

We'll go back to the starting place we used in the previous article. Here are the relevant methods (available in the Program.cs file of the "refactored" project):


    static async Task Main(string[] args)
    {
        try
        {
            await ProducerConsumerWithExceptions();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

        Console.WriteLine("Done");
    }

    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

I won't go through this code again here. Check the prior article for a walkthrough. This code works fine for the success state, but it has strange behavior if the Consumer throws an exception.

Consumer Exception

If the Consumer throws an exception while processing an item, the entire application hangs:


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11

Here's a reminder of what happens:

In the "ProducerConsumerWithExceptions" method, we create a Bounded Channel. This means that the capacity of the channel is limited to 10 items. When the channel has reached that capacity, no new items can be written to it until space is made available for them.

The Consumer reads one item off of the channel (index 0) and then throws an exception. The Consumer then stops, and no further items are read off of the channel.

The Producer produces data and puts it onto the channel. The first items produced (index 0) is pulled off of the channel by the Consumer. The Producer keeps going and puts 10 more items onto the channel (indexes 1 through 10).

Then the Producer produces index 11 and tries to put it onto the channel. At this point, the channel is at capacity (it has 10 items already), so the Producer waits (in an async-friendly way) for there to be space to write the next item.

And this is where the application hangs. The Producer will wait forever. Since "Task.WhenAll" is waiting for the Producer and Consumer to both finish, it will also wait forever. The application will not exit on its own.

No Handling of Exceptions in the Producer/Consumer

One cause of the behavior is that the Producer and Consumer do not handle their own exceptions. They let them bubble up and (hopefully) get caught in the Main method's try/catch block.

Consumer

Here is the code for the Consumer (in the "Program.cs" file in the "refactored" project):


    static async Task Consumer(ChannelReader<int> reader)
    {
        await foreach (var item in reader.ReadAllAsync())
        {
            Console.WriteLine($"Consuming object: {item}");
            MightThrowExceptionForConsumer(item);
        }
    }

This uses the "ReadAllAsync" method on the "ChannelWriter<T>" class to read the items off of the Channel as they become available.

The call to "MightThrowExceptionForConsumer" throws an exception. (Although it has "might" in the name, it is hard-coded to always throw an exception here.) And because the exception is unhandled, the "foreach" loop will exit.

This relies on the exception eventually making its way back to the "Main" method where it will be caught and logged in the catch block.

Producer

The Producer code is similar (in the same "Program.cs" file in the "refactored" project):


    static async Task Producer(ChannelWriter<int> writer)
    {
        try
        {
            for (int i = 0; i < 20; i++)
            {
                Console.WriteLine($"Producing something: {i}");
                MightThrowExceptionForProducer(i);
                await Task.Delay(10);
                await writer.WriteAsync(i);
            }
        }
        finally
        {
            writer.Complete();
        }
    }

This uses a "for" loop to generate 20 items and write them to the Channel using "WriteAsync". This could also throw an exception in the "MightThrowExceptionForProducer" method (although for this sample, it is hard-coded to not throw an exception).

If an exception is thrown in the "for" loop, the loop will exit before finishing. And although we do not have a "catch" block to catch an exception, we do have a "finally" block.

In the "finally" block we mark the ChannelWriter as "Complete". This lets the Consumer know that no new items will be added to the Channel, and the Consumer can stop waiting for new items. This is in a "finally" block because even if we get an exception, we want to make sure that the ChannelWriter is marked "Complete".

But as with the Consumer, the Producer relies on the exception bubbling up to the "Main" method where it should be logged in the "catch" block.

Adding Exception Handling to the Producer and Consumer

A more reliable way of dealing with exceptions is to handle them inside the Producer and the Consumer. This also has the effect of removing the dependency between the Producer and Consumer that we saw earlier. With the dependency removed, we can use a Bounded Channel if we need to, and we can also use "Task.WhenAll" to wait for the Producer and Consumer tasks to complete.

The code for this section is available in the "doesnt-stop" project.

Consumer

Here is the code for the updated Consumer (in the "Program.cs" file in the "doesnt-stop" project):

    static async Task Consumer(ChannelReader<int> reader)
    {
        await foreach (var item in reader.ReadAllAsync())
        {
            try
            {
                Console.WriteLine($"Consuming object: {item}");
                MightThrowExceptionForConsumer(item);
                TotalConsumed++;
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Logged: {ex.Message}");
            }
        }
    }

Here we have added a try/catch block inside of the foreach loop. This means that if an exception is thrown, it will be caught and logged here, and the loop will continue to the next item.

The Consumer is no longer short-circuiting. So we do no longer need to worry about the Consumer stopping before it has read all of the items from the Channel.

One other new item is "TotalConsumed++". This is a static variable that is updated with the total number of items successfully processed (it is after the "MightThrow..." method). This lets us track how many items were successful in the Consumer.

Producer

We have something very similar in the Producer (in the "Program.cs" file in the "doesnt-stop" project):

    static async Task Producer(ChannelWriter<int> writer)
    {
        for (int i = 0; i < 100; i++)
        {
            try
            {
                Console.WriteLine($"Producing something: {i}");
                MightThrowExceptionForProducer(i);
                await Task.Delay(10);
                TotalProduced++;
                await writer.WriteAsync(i);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Logged: {ex.Message}");
            }
        }

        writer.Complete();
    }

Inside the "for" loop, we have a try/catch block. So if an exception is thrown, it is caught and logged, and the loop processing will continue.

Marking the ChannelWriter "Complete" no longer needs to be in a "finally" block. We will not have any unhandled exceptions escape here (and if we did get one, it would probably be an unrecoverable one).

One other new item is "TotalProduced++". This is a static variable that is updated with the total number of items successfully produced (it is after the "MightThrow..." method). This lets us track how many items were successful in the Producer.

Throwing Exceptions

To make the output more interesting, the methods that might throw exceptions use a random number generator to see if an exception should be thrown (in the "Program.cs" file in the "doesnt-stop" project):

    private static void MightThrowExceptionForProducer(int item)
    {
        if (Randomizer.Next() % 3 == 0)
            throw new Exception($"Bad thing happened in Producer ({item})");
    }

    private static void MightThrowExceptionForConsumer(int item)
    {
        if (Randomizer.Next() % 50 == 0)
            throw new Exception($"Bad thing happened in Consumer ({item})");
    }
The Producer throws an exception 1 out of 3 times (approximately). The Consumer throws an exception 1 out of 50 times (approximately).

Program Output

So lets see how this program behaves. We only modified the Producer and Consumer code. The original "Main" and "ProducerConsumerWithExceptions" are unchanged. To give us more of a chance to get random exceptions, I also changed the code so that it produces 100 items (instead of 20).

Consuming object: 90
Logged: Bad thing happened in Producer (91)
Producing something: 92
Producing something: 93
Consuming object: 92
Producing something: 94
Consuming object: 93
Logged: Bad thing happened in Producer (94)
Producing something: 95
Producing something: 96
Consuming object: 95
Producing something: 97
Consuming object: 96
Producing something: 98
Consuming object: 97
Logged: Bad thing happened in Consumer (97)
Producing something: 99
Consuming object: 98
Consuming object: 99
Total Produced: 69
Total Consumed: 66
Done
This is just the tail end of the output. It shows several logged items (2 Producer errors and 1 Consumer error).

Notice that the Total Produced (69) and Total Consumed (66) do not match. This is because the Producer only produced 69 items (of the 100), so the Consumer had a chance to process at most 69 items. The Consumer also generated a few errors (3), so it is showing 3 fewer items.

"Done" is printed out at the bottom. This lets us know that the application ran to completion and exited.

A More Reliable Approach

Our application is more reliable when the Producer and Consumer can deal with their own exceptions. This is not always possible, but it is a good approach for the sample code that we have here.

But we always need to consider what our particular program is doing. I had a great question come up in a recent workshop. The developer asked, 
"If we get an error in the Consumer, can we just put the item back onto the Channel for it to try again?"
If the application has random hiccups that can be solved by re-processing, then maybe this is a good solution. But there is a problem: what if it isn't a random hiccup?

If the Consumer fails every time it tries to process a particular record, then putting it back on the Channel will only create an endless loop of failures.

One approach that we came up with was to have a separate Channel for errors. So if the Consumer failed to process an item, it would put it on the error channel. This would be processed in a different workflow. For example, the Consumer of the error channel could try to process the item again and then log an error if it was unsuccessful. This would give the application a chance to try again (to handle the random hiccups) and also to log errors for items that could not be processed at all.

Wrap Up

As with most code, there are a lot of options. When I'm writing code, I usually spend a lot more time thinking than typing. Working through different scenarios lets me come up with a solution at a high level and figure out how all of the pieces fit together. Typing the code is the easy part.

Happy Coding!

Thursday, October 5, 2023

Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels

In the last article, "Don't Use 'Task.WhenAll' for Interdependent Tasks", we saw how using WhenAll for tasks that are interdependent can cause an application to hang if one of the tasks throws an exception. As a solution, we changed from using "Task.WhenAll" to awaiting the tasks separately. But there are other approaches.

In this article, we will focus on breaking the dependency between the tasks. In the sample code, the channel is bounded (meaning, it has a maximum capacity). This can create a dependency between the Producer and Consumer. One way to break the dependency is to use an unbounded channel.

A Bounded Channel can create a dependency between a Producer and Consumer. Using an Unbounded Channel is one way to break that dependency.

Let's do a quick review of the code that we saw yesterday and then look at how changing the channel can affect the code.

Motivation

This article is based on a question sent in by Edington Watt regarding a presentation that I did about Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed strange behavior when exceptions are thrown, and he asked the question "What are the best exception handling practices in the Consumer Producer approach using Channel<T>?"

One reason for the behavior was the way that a bounded channel created a dependency between the Producer and Consumer. If we can break that dependency, the behavior will change.

Note: The full source code (including the original sample code and various approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.

Articles

Interdependent Tasks

We'll go back to the starting place we used in the previous article. Here are the relevant methods (available in the Program.cs file of the "refactored" project):


    static async Task Main(string[] args)
    {
        try
        {
            await ProducerConsumerWithExceptions();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

        Console.WriteLine("Done");
    }

    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

I won't go through this code again here. Check the previous article for a walkthrough. This code works fine for the success state, but it has strange behavior if the Consumer throws an exception.

Consumer Exception

If the Consumer throws an exception while processing an item, the entire application hangs:


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11

Here's a reminder of what happens:

In the "ProducerConsumerWithExceptions" method, we create a Bounded Channel. This means that the capacity of the channel is limited to 10 items. When the channel has reached that capacity, no new items can be written to it until space is made available for them.

The Consumer reads one item off of the channel (index 0) and then throws an exception. The Consumer then stops, and no further items are read off of the channel.

The Producer produces data and puts it onto the channel. The first items produced (index 0) is pulled off of the channel by the Consumer. The Producer keeps going and puts 10 more items onto the channel (indexes 1 through 10).

Then the Producer produces index 11 and tries to put it onto the channel. At this point, the channel is at capacity (it has 10 items already), so the Producer waits (in an async-friendly way) for there to be space to write the next item.

And this is where the application hangs. The Producer will wait forever. Since "Task.WhenAll" is waiting for the Producer and Consumer to both finish, it will also wait forever. The application will not exit on its own.

Using an Unbounded Channel

What I would like to do is remove the dependency between the Producer and Consumer. Even if the Consumer breaks, the Producer should keep going. One way of doing this is to use an Unbounded Channel (meaning, a channel with no capacity limit).

This is a small change to this application in the ProducerConsumerWithExceptions method. (This code is in the Program.cs file of the "unbounded-channel" project):


    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateUnbounded<int>();

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

Now the Channel is no longer limited to 10 items.

Error Output

Let's take a look at the error output now:


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11
Producing something: 12
Producing something: 13
Producing something: 14
Producing something: 15
Producing something: 16
Producing something: 17
Producing something: 18
Producing something: 19
Bad thing happened in Consumer (0)
Done

The most important part of this output is that we have the "Done" message. This tells us that the application finished on its own. We also see the error message "Bad thing happened in Consumer (0)" just before the "Done".

Here's how things work with this code.

In the "ProducerConsumerWithExceptions" method, we create a Unbounded Channel. This means that the capacity of the channel is not limited.

The Consumer reads one item off of the channel (index 0) and then throws an exception. The Consumer then stops, and no further items are read off of the channel.

The Producer produces data and puts it onto the channel. The first items produced (index 0) is pulled off of the channel by the Consumer. The Producer keeps going and puts 19 more items onto the channel (indexes 1 through 19). Once it has produced all 20 items, the Producer completes.

The Producer Task has completed (successfully) and the Consumer Task has completed (with error). This means that "await Task.WhenAll" can move foward.

Since one of the tasks got an exception, the "await" will raise that exception here. But "ProducerConsumerWithException" does not handle the exception, so it bubbles up to the "Main" method.

The "Main" method has a try/catch block. It catches the thrown exception, prints the error message to the console, outputs "Done", and then the application exits.

Bounded vs. Unbounded Channels

What we've seen here is that using a Bounded Channel can create a dependency between a Producer and Consumer. If the Consumer fails and the Channel reaches capacity, the Producer will never complete.

With an Unbounded Channel, we no longer have that same dependency.

I say that a Bounded Channel "can" create a dependency because it doesn't necessarily create a dependency. There are different ways to write to a Channel that would not create a dependency.

Writing to a Channel

There are multiple ways of writing to a channel. In this code, the Producer uses "WriteAsync":
    await writer.WriteAsync(i);

"WriteAsync" is a method on the ChannelWriter<T> class. If the channel is at capacity, it will wait until there is space available. This is the easiest way to write to a channel, so I tend to use this most of the time. It does have the downside of possibly waiting forever if space is never made available.

But there are other approaches. The ChannelWriter<T> also has a "TryWrite" method:
    bool success = writer.TryWrite(i);

"TryWrite" is not asynchronous. If it writes to the channel successfully, it returns "true". But if the channel is at capacity and the data cannot be written, it returns "false".

This has the advantage of not waiting forever. The disadvantage is that we need a little more code to handle the workflow. What happens if "TryWrite" returns "false"? Do we wait a bit and retry? Do we pause our Producer? Do we simply discard that item? Do we log the item and go to the next one? It depends on what our application needs to do. But the flexibility is there for us to decide.

To help us with the workflow, there is also a "WaitToWriteAsync" method:
    await writer.WaitToWriteAsync();

If the channel is at capacity, this will wait (in an async-friendly way) for space to be available. We can use this to be pretty sure that "TryWrite" will succeed. (But with parallel code, it might not always be the case.)

When we await "WaitToWriteAsync", however, we are back to a situation where we may end up waiting forever if the channel is at capacity and no new items are ready off of it.

Cancellation Tokens

Another part of the async methods on ChannelWriter<T> is that they take optional cancellation tokens.
    await writer.WriteAsync(i, cancellationToken);

We could be pretty creative with a cancellation token. For example, if the Consumer throws an exception, it could also set the cancellation token to the "cancellation requested" state. Then "WriteAsync" would stop waiting.

The same is true if we use a cancellation token with "WaitToWriteAsync".

A Lot of Options

As with many things in C#, Channel<T> has a lot of options. This can be good because of the flexibility that we have as programmers. If we need to do something a little unusual, there are often methods and properties that help us out. The downside is that there is a bit more to learn, and it can be confusing to figure out which approach is the best one for your application.

More Solutions

So we've seen a couple of solutions to fix the weird behavior of the sample application when exceptions are thrown.

If we await the Producer and Consumer tasks separately, we do not need to be as concerned about them being dependent on one another.

If we change the Channel from Bounded to Unbounded, we remove the dependency between the Producer and Consumer.

If we add some exception handling inside the Producer and Consumer Tasks, we can continue processing data even if there are errors. (This approach is shown in the "doesnt-stop" project in the GitHub repository: https://github.com/jeremybytes/channel-exceptions.) We'll look at this in a future article.

Wrap Up

As we dive deeper into different ways of using asynchronous and parallel bits in our code, we often find strange behavior. And sometimes it can take a while to figure out where the strangeness is coming from. But the more that we learn, the easier it gets. So keep learning and keep writing amazing applications that make your users happy.

Happy Coding!

Wednesday, October 4, 2023

Don't Use "Task.WhenAll" for Interdependent Tasks

Using "Task.WhenAll" gives us a chance to pause our code until multiple tasks are complete. In addition, if there is an unhandled exception in one of those tasks, awaiting the WhenAll will raise that thrown exception so we can deal with it. This all works fine if our tasks are not dependent on each other. But if the tasks are interdependent, this can cause the application to hang.

Short Version: 
"Task.WhenAll" works great for independent tasks. If the tasks are dependent on one another, it can be better to await the individual tasks instead.
When we start using tasks and parallel code in our applications, we can run into some weirdness that can be difficult to debug. Today we'll look at some of the weirdness that can occur around "Task.WhenAll" and understand how to alleviate it.

Motivation

This article is based on a question sent in by Edington Watt regarding a presentation that I did about Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed strange behavior when exceptions are thrown, and he asked the question "What are the best exception handling practices in the Consumer Producer approach using Channel<T>?"

One reason for the behavior was the way that "await Task.WhenAll" was used in the code. We will see some of that code and a solution to it after a bit of an overview of how I generally use "Task.WhenAll".

Note: The full source code (including the original sample code and various approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.

Articles

Awaiting Non-Dependent Tasks

I use "Task.WhenAll" in a couple of scenarios. The first is when I want to run several of the same tasks in parallel, and I need to wait for all of them to complete.

Parallel Tasks

Here is some code from my workshop on "Asynchronous and Parallel Programming in C#". (Alert: The last scheduled public workshop on this topic is coming up soon: https://jeremybytes.blogspot.com/2023/09/last-chance-full-day-workshop-on.html.)


    static async Task RunWithContinuation(List<int> ids)
    {
        List<Task> allContinuations = new();

        foreach (var id in ids)
        {
            Task<Person> personTask = reader.GetPersonAsync(id);
            Task continuation = personTask.ContinueWith(task => ...);
            allContinuations.Add(continuation);
        }

        await Task.WhenAll(allContinuations);
    }

Inside the "foreach" loop, this code runs a method (GetPersonAsync) that returns a Task, and then sets up a continuation to run when that first task is complete. We do not "await" the tasks inside the loop because we want to run them in parallel.

Each continuation also returns a Task (which is named "continuation"). The trick to this method is that even though I want all of these tasks to run in parallel, I do not want to exit the method until they are all complete.

This is where "Task.WhenAll" comes in.

The continuation tasks are collected in a list (called "allContinuations"). Then I pass that collection to "Task.WhenAll". When I "await" Task.WhenAll, the code will pause (in a nice async-friendly way) until all of the continuation tasks are complete. This means that the method ("RunWithContinuation") will not exit until all of the tasks have completed.

These tasks are not dependent on each other. Each is its own atomic task: it gets a "Person" record based on an "id" parameter and then uses the result of that. (The body of the continuation is hidden, but it takes the "Person" record and outputs it for the user.)

If any of these continuation tasks throws an exception, then that exception is raised when we "await Task.WhenAll".

But here is one key point:
The exception is not raised until after all of the tasks have completed.
And here "completed" means that it finished successfully, with an error, or by being canceled.

Since these tasks are not dependent on each other, they can all complete even if one (or more) of the tasks fails.

Related (but not Dependent) Tasks

As another example, here is some code from a hands-on lab (from the same workshop mentioned above). This particular lab is about dealing with AggregateExceptions.


    await Task.WhenAll(orderDetailsTask, customerTask, productTask);

In this code, we have 3 separate tasks: one to get order details, one to get a customer, and one to get a list of products. These 3 pieces are used to assemble a complete "Order". (Please don't write to me about how this isn't a good way to assemble a complex object; it is code specifically to allow for digging into AggregateException.)

Similar to above, the "await" will cause this code to pause until all 3 of the tasks have completed. If there is an exception in any of the tasks, it will be raised here. As noted above, the exception will only be raised after all 3 tasks are complete.

These tasks are related to each other, but they are not dependent on each other. So one task failing will not stop the other tasks from completing.

Interdependent Tasks

Now that we have a bit of an introduction to how "Task.WhenAll" behaves, let's look at a situation where the tasks are dependent on each other. As we'll see, this can cause us some problems.

I refactored the code originally provided by Edington Watt and the WTW ICT Technology team to show different behaviors in the code. You can get the original code (and the various solutions) here: https://github.com/jeremybytes/channel-exceptions.

Starting Code

We'll start at the root of the application. (The code can be found in the "Program.cs" file of the "refactored" project.)


    static async Task Main(string[] args)
    {
        try
        {
            await ProducerConsumerWithExceptions();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

        Console.WriteLine("Done");
    }

When we "await" a Task that throws an exception, that exception is raised in our code. (If we do not await the Task, then we have to go looking for the exception in the Task properties.)

The idea behind this code is that if an exception is thrown in the main "ProducerConsumerWithExceptions" method (that returns a Task), we can catch it and output a message to the console.

Before looking at the behavior, lets see what "ProducerConsumerWithExceptions" does.


    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

This code uses the Producer/Consumer pattern with a Channel in the middle. The idea is that the Producer can produce data in parallel and then put the data onto the Channel. The Consumer reads data off of the Channel and does something with it (in this case, it outputs it to the console).

This code has an "await Task.WhenAll". So this method will not exit until both of these tasks (the produder and the consumer) have completed. And if either of the tasks throws an exception, it will be raised here. Since we are not handling the exception in this method, it will bubble up and be handled in the "Main" method above.

Success State
If both the producer and consumer complete successfully, we get output similar to the following:


...
Producing something: 12
Consuming object: 11
Producing something: 13
Consuming object: 12
Producing something: 14
Consuming object: 13
Producing something: 15
Consuming object: 14
Producing something: 16
Consuming object: 15
Producing something: 17
Consuming object: 16
Producing something: 18
Consuming object: 17
Producing something: 19
Consuming object: 18
Consuming object: 19
Done
This is is just showing the end part of the output, but we can see that the producer is producing objects and the consumer is consuming objects. Once 20 items have been produced (starting with index 0), the application prints "Done" and exits.

Error Behavior
Things get strange when we get an exception. Here is the output when the Consumer throws an exception on reading the first record (index 0):


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11

From the output, the Producer produced 12 items (and then stopped). The Consumer throws an exception while reading the first record, so we only see output for 1 item here.

At first it seems strange that we do not see the exception in the output; after all, that is part of the try/catch block in the Main method.

But here's the real problem: The application is still running!

Interdependent Tasks
The application is stuck on awaiting Task.WhenAll.


    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

The code is hung because the Producer and Consumer are interdependent. The reason for this is a bit subtle if you're not familiar with how Channel<T> works in C#.

On the first line of this method, we create a bounded channel. A bounded channel is limited to holding a certain number of items. In this case, the channel is limited to holding 10 items. Once the channel has reached capacity, no new items can be written until one has been read off.

So here's what is happening. The Producer starts producing data (with index 0) and putting it onto the channel. The consumer reads the first item (index 0) off of the channel and then throws an exception. The Consumer task is now faulted (meaning, an exception was thrown in the Task). The Consumer tasks is now "complete".

But the Producer keeps working. It keeps producing records until the channel reaches capacity (holding indexes 1 through 10). It then produces index 11 and then pauses. The code to write to the channel is waiting (in an async-friendly way) for there to be space in the channel. But the channel is at capacity. And since the Consumer is faulted, no more items will be read off of the channel. So the Producer is left waiting forever.

Since the Producer does not complete, the "await Task.WhenAll" will also wait forever. Because of this, the exception is never raised in this part of the code, so the "Main" try/catch block never has a chance to handle it. The application will never complete on its own.

One Solution - "await" Tasks Separately

One solution to this particular problem is to "await" the Producer and Consumer tasks separately. Here is what that code looks like.  (The code can be found in the "Program.cs" file of the "separate-await" project.)

    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

	await consumer;
        await producer;
    }

Notice that instead of using "await Task.WhenAll", we now have separate "await consumer" and "await producer" sections. Since our producer is dependent upon the consumer making space available in the channel, it is best to wait for the consumer first here. (It's okay if the subtleties of that get lost; parallel async code is "fun".)

Here is the output from the code above:

Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Bad thing happened in Consumer (0)
Producing something: 4
Done

The output shows the Producer producing items (indexes 0 through 4 in this case). The Consumer pulls the first item (index 0) from the channel and then throws an exception.

The line "await consumer" above waits for the Consumer to complete. As soon as the Consumer task throws an exception, it is complete (in the "faulted" state). And since we awaited the faulted task, the exception gets raised.

The exception is not handled in this method, so the method short-circuits (meaning we never get to the "await producer" line). The exception bubbles up to the "Main" method, and that try/catch block outputs the exception message to the console ("Bad thing happened in Consumer (0)").

Most importantly, we see the "Done" message which means that our application exited on its own.

Side note: We get the message "Producing something: 4" after the exception message because the Producer is still running its code concurrently. So, the Producer has a chance to produce one more item before the application exits.

Task.WhenAll and Interdependent Tasks

So we've seen that if we have interdependent tasks, awaiting Task.WhenAll can lead our program to hang. In this situation, one task failing (the consumer) prevented the other task from completing (the producer). Since not all tasks complete, "await Task.WhenAll" ends up waiting forever.
If we have interdependent tasks, it is better to await the tasks separately. If we have tasks which are not dependent (such as the parallel and related examples above), then "await Task.WhenAll" usually works just fine.

More Solutions

There are other ways to fix the strange behavior of the sample application. You can take a look at some of the other solutions in the GitHub repository: https://github.com/jeremybytes/channel-exceptions.

Another solution is to change the bounded channel to an unbounded channel (in the "unbounded-channel" project). This gets rid of the capacity limitation and removes the dependency between the Producer task and Consumer task. [Update - article available: Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels]

Both this solution and the one presented in this article are short-circuiting -- meaning, if the Consumer throws an exception on one item, no other items will be consumed. The same is true of the Producer.

The GitHub repository also provides a more robust implementation (in the "doesnt-stop" project). In this project, if the Consumer throws an exception while processing an item, it will log the error and continue processing. (The same is true of the Producer in this project.)

Wrap Up

As we dive deeper into different ways of using asynchronous and parallel bits in our code, we often find strange behavior. And sometimes it can take a while to figure out where the strangeness is coming from. But the more that we learn, the easier it gets. So keep learning and keep writing amazing applications that make your users happy.

Happy Coding!