Wednesday, February 28, 2024

Continue Processing with Parallel.ForEachAsync (even when exceptions are thrown)

Parallel.ForEachAsync is a very useful tool for running code in parallel. Recently, we have been exploring what happens when an exception is thrown inside the loop:

  1. If we "await" ForEachAsync, then we get a single exception (even if exceptions are thrown in multiple iterations of the loop).
  2. The loop short-circuits -- meaning not all items are processed.

In this series of articles, we look at these issues and how to deal with them.

Code samples for all articles are available here: https://github.com/jeremybytes/foreachasync-exception.

In the last article, we saw how to deal with behavior #1 by getting all of the available exceptions. In this article, we will look at behavior #2 -- short-circuiting. We can eliminate the short-circuiting of the loop so that all of the items are processed, and we can collect the exceptions along the way.

Short Version:

Handle exceptions inside the body of ForEachAsync

For slides and code samples on Parallel.ForEachAsync (and other parallel approaches), you can take a look at the materials from my full-day workshop on asynchronous programming: https://github.com/jeremybytes/async-workshop-2022. (These materials use .NET 6.0. Updates for .NET 8.0 are coming in a few months.) For announcements on public workshops, check here: https://jeremybytes.blogspot.com/p/workshops.html.

Prevent Short-Circuiting

Parallel.ForEachAsync will stop processing if it encounters an unhandled exception. This is the behavior that we've seen in our other examples. The result is that we only process 17 of the 100 items in our loop.

Since an unhandled exception is the cause of the short-circuit, we can continue processing by eliminating that unhandled exception.

And an easy way to eliminate an unhandled exception is to handle it.

Try/Catch inside the ForEachAsync Body

Here is the updated code (from the "doesnt-stop/Program.cs" file):

    await Parallel.ForEachAsync(Enumerable.Range(1, 100),
        new ParallelOptions() { MaxDegreeOfParallelism = 10 },
        async (i, _) =>
        {
            try
            {
                Console.WriteLine($"Processing item: {i}");
                await Task.Delay(10); // simulate async task
                MightThrowException(i);
                Interlocked.Increment(ref TotalProcessed);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Caught in Loop: {ex.Message}");
            }
        });

Here we have a try/catch block inside the body of ForEachAsync. If an exception is thrown, it is handled inside of the loop. From ForEachAsync's perspective, there are no unhandled exceptions, so it continues processing.

Output

In the output, all 100 items processed -- or are at least attempted to be processed. Here is the last part of the output:


Caught in Loop: Bad thing happened inside loop (87)
Processing item: 92
Processing item: 93
Processing item: 94
Processing item: 95
Caught in Loop: Bad thing happened inside loop (84)
Processing item: 96
Caught in Loop: Bad thing happened inside loop (81)
Processing item: 97
Processing item: 98
Processing item: 99
Processing item: 100
Caught in Loop: Bad thing happened inside loop (99)
Caught in Loop: Bad thing happened inside loop (93)
Caught in Loop: Bad thing happened inside loop (96)

Total Processed: 67
Total Exceptions: 33
Done (Doesn't Stop for Exceptions)

All 100 items were processed. 67 were successful and 33 of them failed -- this is what we expect based on our method that throws exceptions.

Observations

With this approach, we do not have to deal with AggregateException. Instead, we handle the individual exceptions as they occur. This could include logging or retrying the operation.

Because we have a standard try/catch block, we get the full exception (including stack trace and other information). We can log this information if we need to investigate further.

Since the loop does not stop, we do not need to worry about where the loop left in a short-circuit. All of the items have a chance to be processed.

We do need to worry about concurrency. The body of the catch block could be running for multiple items at the same time. So we need to ensure that our logging methods and any other processing in the catch block is thread-safe.

Wrap Up

Ultimately, the approach we take depends on the specific needs of the process. But we do need to keep the default behavior of Parallel.ForEachAsync in mind:

  1. If we "await" ForEachAsync, then we get a single exception (even if exceptions are thrown in multiple iterations of the loop).
  2. The loop short-circuits -- meaning not all items are processed.

If getting an exception in the ForEachAsync loop is truly exceptional (meaning it really should never happen), then the default behavior may be okay. An exception is a catastrophic error that stops the process and lets us know that (at least) one item failed. In this case, default behavior may be fine (as we saw in the first article).

It may be that short-circuiting is okay (because we need to restart the loop from the beginning in case of failure), but we still want more information about what happened. We can get all of the available exceptions by either using a continuation or by using ConfigureAwaitOptions (as we saw in the last article).

If we want to continue processing even if some of the items fail, then we can take the approach from this article -- put a try/catch block inside the body of ForEachAsync (as we saw in this article).

Whatever approach we take, it is always good to know that there are options. Each application has its own needs. Our job as programmers is to pick an option that works well for the application. So keep looking at options; you never know when you'll need a particular one.

Happy Coding!

No comments:

Post a Comment