Using "Task.WhenAll" gives us a chance to pause our code until multiple tasks
  are complete. In addition, if there is an unhandled exception in one of those
  tasks, awaiting the WhenAll will raise that thrown exception so we can deal
  with it. This all works fine if our tasks are not dependent on each other. But
  if the tasks are interdependent, this can cause the application to hang.
Short Version: 
"Task.WhenAll" works great for independent tasks. If the tasks are dependent on one another, it can be better to await the individual tasks instead.
  When we start using tasks and parallel code in our applications, we can run
  into some weirdness that can be difficult to debug. Today we'll look at some
  of the weirdness that can occur around "Task.WhenAll" and understand how to
  alleviate it.
Motivation
  This article is based on a question sent in by Edington Watt regarding a
  presentation that I did about
  Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed
  strange behavior when exceptions are thrown, and he asked the question "What
  are the best exception handling practices in the Consumer Producer approach
  using Channel<T>?"
  One reason for the behavior was the way that "await Task.WhenAll" was used in
  the code. We will see some of that code and a solution to it after a bit of an
  overview of how I generally use "Task.WhenAll".
  Note: The full source code (including the original sample code and various
    approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.
Articles
- Don't Use "Task.WhenAll" for Interdependent Tasks (this one)
- Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels
- Producer/Consumer Exception Handling - A More Reliable Approach
Awaiting Non-Dependent Tasks
  I use "Task.WhenAll" in a couple of scenarios. The first is when I want to run
  several of the same tasks in parallel, and I need to wait for all of them to
  complete.
Parallel Tasks
  Here is some code from my workshop on "Asynchronous and Parallel Programming
  in C#". (Alert: The last scheduled public workshop on this topic is coming up
    soon: https://jeremybytes.blogspot.com/2023/09/last-chance-full-day-workshop-on.html.)
    static async Task RunWithContinuation(List<int> ids)
    {
        List<Task> allContinuations = new();
        foreach (var id in ids)
        {
            Task<Person> personTask = reader.GetPersonAsync(id);
            Task continuation = personTask.ContinueWith(task => ...);
            allContinuations.Add(continuation);
        }
        await Task.WhenAll(allContinuations);
    }
  Inside the "foreach" loop, this code runs a method (GetPersonAsync) that
  returns a Task, and then sets up a continuation to run when that first task is
  complete. We do not "await" the tasks inside the loop because we want to run
  them in parallel.
  Each continuation also returns a Task (which is named "continuation"). The
  trick to this method is that even though I want all of these tasks to run in
  parallel, I do not want to exit the method until they are all complete.
This is where "Task.WhenAll" comes in.
  The continuation tasks are collected in a list (called "allContinuations").
  Then I pass that collection to "Task.WhenAll". When I "await" Task.WhenAll,
  the code will pause (in a nice async-friendly way) until all of the
  continuation tasks are complete. This means that the method
  ("RunWithContinuation") will not exit until all of the tasks have completed.
  These tasks are not dependent on each other. Each is its own
  atomic task: it gets a "Person" record based on an "id" parameter and then
  uses the result of that. (The body of the continuation is hidden, but it takes
  the "Person" record and outputs it for the user.)
  If any of these continuation tasks throws an exception, then that exception is
  raised when we "await Task.WhenAll".
But here is one key point:
The exception is not raised until after all of the tasks have completed.
  And here "completed" means that it finished successfully, with an error, or by
  being canceled.
  Since these tasks are not dependent on each other, they can all complete even
  if one (or more) of the tasks fails.
Related (but not Dependent) Tasks
  As another example, here is some code from a hands-on lab (from the same
  workshop mentioned above). This particular lab is about dealing with
  AggregateExceptions.
    await Task.WhenAll(orderDetailsTask, customerTask, productTask);
  In this code, we have 3 separate tasks: one to get order details, one to get a
  customer, and one to get a list of products. These 3 pieces are used to
  assemble a complete "Order". (Please don't write to me about how this isn't a
  good way to assemble a complex object; it is code specifically to allow for
  digging into AggregateException.)
  Similar to above, the "await" will cause this code to pause until all 3 of the
  tasks have completed. If there is an exception in any of the tasks, it will be
  raised here. As noted above, the exception will only be raised after all
  3 tasks are complete.
  These tasks are related to each other, but they are not dependent on each
  other. So one task failing will not stop the other tasks from completing.
Interdependent Tasks
  Now that we have a bit of an introduction to how "Task.WhenAll" behaves, let's
  look at a situation where the tasks are dependent on each other.
  As we'll see, this can cause us some problems.
  I refactored the code originally provided by Edington Watt and the WTW
    ICT Technology team to show different behaviors in the code. You can get the
    original code (and the various solutions) here: https://github.com/jeremybytes/channel-exceptions.
Starting Code
  We'll start at the root of the application. (The code can be found in the
  "Program.cs" file of the "refactored" project.)
    static async Task Main(string[] args)
    {
        try
        {
            await ProducerConsumerWithExceptions();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        Console.WriteLine("Done");
    }
  When we "await" a Task that throws an exception, that exception is raised in
  our code. (If we do not await the Task, then we have to go looking for the
  exception in the Task properties.)
  The idea behind this code is that if an exception is thrown in the main
  "ProducerConsumerWithExceptions" method (that returns a Task), we can catch it
  and output a message to the console.
  Before looking at the behavior, lets see what "ProducerConsumerWithExceptions"
  does.
    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));
        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);
        await Task.WhenAll(producer, consumer);
    }
  This code uses the Producer/Consumer pattern with a Channel in the middle. The
  idea is that the Producer can produce data in parallel and then put the data
  onto the Channel. The Consumer reads data off of the Channel and does
  something with it (in this case, it outputs it to the console).
  This code has an "await Task.WhenAll". So this method will not exit until both
  of these tasks (the produder and the consumer) have completed. And if either
  of the tasks throws an exception, it will be raised here. Since we are not
  handling the exception in this method, it will bubble up and be handled in the
  "Main" method above.
Success State
  If both the producer and consumer complete successfully, we get output similar
  to the following:
...
Producing something: 12
Consuming object: 11
Producing something: 13
Consuming object: 12
Producing something: 14
Consuming object: 13
Producing something: 15
Consuming object: 14
Producing something: 16
Consuming object: 15
Producing something: 17
Consuming object: 16
Producing something: 18
Consuming object: 17
Producing something: 19
Consuming object: 18
Consuming object: 19
Done
  This is is just showing the end part of the output, but we can see that the
  producer is producing objects and the consumer is consuming objects. Once 20
  items have been produced (starting with index 0), the application prints
  "Done" and exits.
Error Behavior
  Things get strange when we get an exception. Here is the output when the
  Consumer throws an exception on reading the first record (index 0):
Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11
  From the output, the Producer produced 12 items (and then stopped). The
  Consumer throws an exception while reading the first record, so we only see
  output for 1 item here.
  At first it seems strange that we do not see the exception in the output;
  after all, that is part of the try/catch block in the Main method.
But here's the real problem: The application is still running!
Interdependent Tasks
The application is stuck on awaiting Task.WhenAll.
    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));
        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);
        await Task.WhenAll(producer, consumer);
    }
  The code is hung because the Producer and Consumer are interdependent. The
  reason for this is a bit subtle if you're not familiar with how
  Channel<T> works in C#.
  On the first line of this method, we create a bounded channel. A bounded
  channel is limited to holding a certain number of items. In this case, the
  channel is limited to holding 10 items. Once the channel has reached capacity,
  no new items can be written until one has been read off.
  So here's what is happening. The Producer starts producing data (with index 0)
  and putting it onto the channel. The consumer reads the first item (index 0)
  off of the channel and then throws an exception. The Consumer task is now
  faulted (meaning, an exception was thrown in the Task). The Consumer tasks is
  now "complete".
  But the Producer keeps working. It keeps producing records until the channel
  reaches capacity (holding indexes 1 through 10). It then produces index 11 and
  then pauses. The code to write to the channel is waiting (in an async-friendly
  way) for there to be space in the channel. But the channel is at capacity. And
  since the Consumer is faulted, no more items will be read off of the channel.
  So the Producer is left waiting forever.
  Since the Producer does not complete, the "await Task.WhenAll" will also wait
  forever. Because of this, the exception is never raised in this part of the
  code, so the "Main" try/catch block never has a chance to handle it. The
  application will never complete on its own.
One Solution - "await" Tasks Separately
  One solution to this particular problem is to "await" the Producer and
  Consumer tasks separately. Here is what that code looks like.  (The code
  can be found in the "Program.cs" file of the "separate-await" project.)
    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));
        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);
	await consumer;
        await producer;
    }
  Notice that instead of using "await Task.WhenAll", we now have separate "await
  consumer" and "await producer" sections. Since our producer is dependent upon
  the consumer making space available in the channel, it is best to wait for the
  consumer first here. (It's okay if the subtleties of that get lost; parallel
  async code is "fun".)
Here is the output from the code above:
Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Bad thing happened in Consumer (0)
Producing something: 4
Done
  The output shows the Producer producing items (indexes 0 through 4 in this
  case). The Consumer pulls the first item (index 0) from the channel and then
  throws an exception.
  The line "await consumer" above waits for the Consumer to complete. As soon as
  the Consumer task throws an exception, it is complete (in the "faulted"
  state). And since we awaited the faulted task, the exception gets raised.
  The exception is not handled in this method, so the method short-circuits
  (meaning we never get to the "await producer" line). The exception bubbles up
  to the "Main" method, and that try/catch block outputs the exception message
  to the console ("Bad thing happened in Consumer (0)").
  Most importantly, we see the "Done" message which means that our application
  exited on its own.
  Side note: We get the message "Producing something: 4" after the exception
    message because the Producer is still running its code concurrently. So, the
    Producer has a chance to produce one more item before the application
    exits.
Task.WhenAll and Interdependent Tasks
  So we've seen that if we have interdependent tasks, awaiting Task.WhenAll can
  lead our program to hang. In this situation, one task failing (the consumer)
  prevented the other task from completing (the producer). Since not all tasks
  complete, "await Task.WhenAll" ends up waiting forever.
If we have interdependent tasks, it is better to await the tasks separately. If we have tasks which are not dependent (such as the parallel and related examples above), then "await Task.WhenAll" usually works just fine.
More Solutions
  There are other ways to fix the strange behavior of the sample application.
  You can take a look at some of the other solutions in the GitHub
  repository: https://github.com/jeremybytes/channel-exceptions.
  Another solution is to change the bounded channel to an unbounded channel (in
  the "unbounded-channel" project). This gets rid of the capacity limitation and
  removes the dependency between the Producer task and Consumer task.
  [Update - article available:
    Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded
      Channels]
  Both this solution and the one presented in this article are short-circuiting
  -- meaning, if the Consumer throws an exception on one item, no other items
  will be consumed. The same is true of the Producer.
  The GitHub repository also provides a more robust implementation (in the
  "doesnt-stop" project). In this project, if the Consumer throws an exception
  while processing an item, it will log the error and continue processing. (The
  same is true of the Producer in this project.)
Wrap Up
  As we dive deeper into different ways of using asynchronous and parallel bits
  in our code, we often find strange behavior. And sometimes it can take a while
  to figure out where the strangeness is coming from. But the more that we
  learn, the easier it gets. So keep learning and keep writing amazing
  applications that make your users happy.
Happy Coding!








 
 
No comments:
Post a Comment