Wednesday, October 4, 2023

Don't Use "Task.WhenAll" for Interdependent Tasks

Using "Task.WhenAll" gives us a chance to pause our code until multiple tasks are complete. In addition, if there is an unhandled exception in one of those tasks, awaiting the WhenAll will raise that thrown exception so we can deal with it. This all works fine if our tasks are not dependent on each other. But if the tasks are interdependent, this can cause the application to hang.

Short Version: 
"Task.WhenAll" works great for independent tasks. If the tasks are dependent on one another, it can be better to await the individual tasks instead.
When we start using tasks and parallel code in our applications, we can run into some weirdness that can be difficult to debug. Today we'll look at some of the weirdness that can occur around "Task.WhenAll" and understand how to alleviate it.

Motivation

This article is based on a question sent in by Edington Watt regarding a presentation that I did about Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed strange behavior when exceptions are thrown, and he asked the question "What are the best exception handling practices in the Consumer Producer approach using Channel<T>?"

One reason for the behavior was the way that "await Task.WhenAll" was used in the code. We will see some of that code and a solution to it after a bit of an overview of how I generally use "Task.WhenAll".

Note: The full source code (including the original sample code and various approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.

Articles

Awaiting Non-Dependent Tasks

I use "Task.WhenAll" in a couple of scenarios. The first is when I want to run several of the same tasks in parallel, and I need to wait for all of them to complete.

Parallel Tasks

Here is some code from my workshop on "Asynchronous and Parallel Programming in C#". (Alert: The last scheduled public workshop on this topic is coming up soon: https://jeremybytes.blogspot.com/2023/09/last-chance-full-day-workshop-on.html.)


    static async Task RunWithContinuation(List<int> ids)
    {
        List<Task> allContinuations = new();

        foreach (var id in ids)
        {
            Task<Person> personTask = reader.GetPersonAsync(id);
            Task continuation = personTask.ContinueWith(task => ...);
            allContinuations.Add(continuation);
        }

        await Task.WhenAll(allContinuations);
    }

Inside the "foreach" loop, this code runs a method (GetPersonAsync) that returns a Task, and then sets up a continuation to run when that first task is complete. We do not "await" the tasks inside the loop because we want to run them in parallel.

Each continuation also returns a Task (which is named "continuation"). The trick to this method is that even though I want all of these tasks to run in parallel, I do not want to exit the method until they are all complete.

This is where "Task.WhenAll" comes in.

The continuation tasks are collected in a list (called "allContinuations"). Then I pass that collection to "Task.WhenAll". When I "await" Task.WhenAll, the code will pause (in a nice async-friendly way) until all of the continuation tasks are complete. This means that the method ("RunWithContinuation") will not exit until all of the tasks have completed.

These tasks are not dependent on each other. Each is its own atomic task: it gets a "Person" record based on an "id" parameter and then uses the result of that. (The body of the continuation is hidden, but it takes the "Person" record and outputs it for the user.)

If any of these continuation tasks throws an exception, then that exception is raised when we "await Task.WhenAll".

But here is one key point:
The exception is not raised until after all of the tasks have completed.
And here "completed" means that it finished successfully, with an error, or by being canceled.

Since these tasks are not dependent on each other, they can all complete even if one (or more) of the tasks fails.

Related (but not Dependent) Tasks

As another example, here is some code from a hands-on lab (from the same workshop mentioned above). This particular lab is about dealing with AggregateExceptions.


    await Task.WhenAll(orderDetailsTask, customerTask, productTask);

In this code, we have 3 separate tasks: one to get order details, one to get a customer, and one to get a list of products. These 3 pieces are used to assemble a complete "Order". (Please don't write to me about how this isn't a good way to assemble a complex object; it is code specifically to allow for digging into AggregateException.)

Similar to above, the "await" will cause this code to pause until all 3 of the tasks have completed. If there is an exception in any of the tasks, it will be raised here. As noted above, the exception will only be raised after all 3 tasks are complete.

These tasks are related to each other, but they are not dependent on each other. So one task failing will not stop the other tasks from completing.

Interdependent Tasks

Now that we have a bit of an introduction to how "Task.WhenAll" behaves, let's look at a situation where the tasks are dependent on each other. As we'll see, this can cause us some problems.

I refactored the code originally provided by Edington Watt and the WTW ICT Technology team to show different behaviors in the code. You can get the original code (and the various solutions) here: https://github.com/jeremybytes/channel-exceptions.

Starting Code

We'll start at the root of the application. (The code can be found in the "Program.cs" file of the "refactored" project.)


    static async Task Main(string[] args)
    {
        try
        {
            await ProducerConsumerWithExceptions();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

        Console.WriteLine("Done");
    }

When we "await" a Task that throws an exception, that exception is raised in our code. (If we do not await the Task, then we have to go looking for the exception in the Task properties.)

The idea behind this code is that if an exception is thrown in the main "ProducerConsumerWithExceptions" method (that returns a Task), we can catch it and output a message to the console.

Before looking at the behavior, lets see what "ProducerConsumerWithExceptions" does.


    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

This code uses the Producer/Consumer pattern with a Channel in the middle. The idea is that the Producer can produce data in parallel and then put the data onto the Channel. The Consumer reads data off of the Channel and does something with it (in this case, it outputs it to the console).

This code has an "await Task.WhenAll". So this method will not exit until both of these tasks (the produder and the consumer) have completed. And if either of the tasks throws an exception, it will be raised here. Since we are not handling the exception in this method, it will bubble up and be handled in the "Main" method above.

Success State
If both the producer and consumer complete successfully, we get output similar to the following:


...
Producing something: 12
Consuming object: 11
Producing something: 13
Consuming object: 12
Producing something: 14
Consuming object: 13
Producing something: 15
Consuming object: 14
Producing something: 16
Consuming object: 15
Producing something: 17
Consuming object: 16
Producing something: 18
Consuming object: 17
Producing something: 19
Consuming object: 18
Consuming object: 19
Done
This is is just showing the end part of the output, but we can see that the producer is producing objects and the consumer is consuming objects. Once 20 items have been produced (starting with index 0), the application prints "Done" and exits.

Error Behavior
Things get strange when we get an exception. Here is the output when the Consumer throws an exception on reading the first record (index 0):


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11

From the output, the Producer produced 12 items (and then stopped). The Consumer throws an exception while reading the first record, so we only see output for 1 item here.

At first it seems strange that we do not see the exception in the output; after all, that is part of the try/catch block in the Main method.

But here's the real problem: The application is still running!

Interdependent Tasks
The application is stuck on awaiting Task.WhenAll.


    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

The code is hung because the Producer and Consumer are interdependent. The reason for this is a bit subtle if you're not familiar with how Channel<T> works in C#.

On the first line of this method, we create a bounded channel. A bounded channel is limited to holding a certain number of items. In this case, the channel is limited to holding 10 items. Once the channel has reached capacity, no new items can be written until one has been read off.

So here's what is happening. The Producer starts producing data (with index 0) and putting it onto the channel. The consumer reads the first item (index 0) off of the channel and then throws an exception. The Consumer task is now faulted (meaning, an exception was thrown in the Task). The Consumer tasks is now "complete".

But the Producer keeps working. It keeps producing records until the channel reaches capacity (holding indexes 1 through 10). It then produces index 11 and then pauses. The code to write to the channel is waiting (in an async-friendly way) for there to be space in the channel. But the channel is at capacity. And since the Consumer is faulted, no more items will be read off of the channel. So the Producer is left waiting forever.

Since the Producer does not complete, the "await Task.WhenAll" will also wait forever. Because of this, the exception is never raised in this part of the code, so the "Main" try/catch block never has a chance to handle it. The application will never complete on its own.

One Solution - "await" Tasks Separately

One solution to this particular problem is to "await" the Producer and Consumer tasks separately. Here is what that code looks like.  (The code can be found in the "Program.cs" file of the "separate-await" project.)

    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

	await consumer;
        await producer;
    }

Notice that instead of using "await Task.WhenAll", we now have separate "await consumer" and "await producer" sections. Since our producer is dependent upon the consumer making space available in the channel, it is best to wait for the consumer first here. (It's okay if the subtleties of that get lost; parallel async code is "fun".)

Here is the output from the code above:

Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Bad thing happened in Consumer (0)
Producing something: 4
Done

The output shows the Producer producing items (indexes 0 through 4 in this case). The Consumer pulls the first item (index 0) from the channel and then throws an exception.

The line "await consumer" above waits for the Consumer to complete. As soon as the Consumer task throws an exception, it is complete (in the "faulted" state). And since we awaited the faulted task, the exception gets raised.

The exception is not handled in this method, so the method short-circuits (meaning we never get to the "await producer" line). The exception bubbles up to the "Main" method, and that try/catch block outputs the exception message to the console ("Bad thing happened in Consumer (0)").

Most importantly, we see the "Done" message which means that our application exited on its own.

Side note: We get the message "Producing something: 4" after the exception message because the Producer is still running its code concurrently. So, the Producer has a chance to produce one more item before the application exits.

Task.WhenAll and Interdependent Tasks

So we've seen that if we have interdependent tasks, awaiting Task.WhenAll can lead our program to hang. In this situation, one task failing (the consumer) prevented the other task from completing (the producer). Since not all tasks complete, "await Task.WhenAll" ends up waiting forever.
If we have interdependent tasks, it is better to await the tasks separately. If we have tasks which are not dependent (such as the parallel and related examples above), then "await Task.WhenAll" usually works just fine.

More Solutions

There are other ways to fix the strange behavior of the sample application. You can take a look at some of the other solutions in the GitHub repository: https://github.com/jeremybytes/channel-exceptions.

Another solution is to change the bounded channel to an unbounded channel (in the "unbounded-channel" project). This gets rid of the capacity limitation and removes the dependency between the Producer task and Consumer task. [Update - article available: Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels]

Both this solution and the one presented in this article are short-circuiting -- meaning, if the Consumer throws an exception on one item, no other items will be consumed. The same is true of the Producer.

The GitHub repository also provides a more robust implementation (in the "doesnt-stop" project). In this project, if the Consumer throws an exception while processing an item, it will log the error and continue processing. (The same is true of the Producer in this project.)

Wrap Up

As we dive deeper into different ways of using asynchronous and parallel bits in our code, we often find strange behavior. And sometimes it can take a while to figure out where the strangeness is coming from. But the more that we learn, the easier it gets. So keep learning and keep writing amazing applications that make your users happy.

Happy Coding!

No comments:

Post a Comment