Tuesday, February 27, 2024

Getting Multiple Exceptions from Parallel.ForEachAsync

Parallel.ForEachAsync is a very useful tool for running code in parallel. Last time, we looked at what happens when an exception is thrown inside of the loop:

  1. If we "await" ForEachAsync, then we get a single exception (even if exceptions are thrown in multiple iterations of the loop).
  2. The loop short-circuits -- meaning not all items are processed.

Depending on what we are doing in the parallel loop, these items may not be a concern. But there are situations where I would like to get all of the exceptions back; and there are times when I would like to capture the exceptions and continue processing.

In this series of articles, we look at these issues and how to deal with them.

Code samples for all articles are available here: https://github.com/jeremybytes/foreachasync-exception.

In this article, we will take on the first issue: how to get all available exceptions from the loop. The number of exceptions that we get depends on the second issue. Since the loop short-circuits, we end up with some exceptions, but not as many as if the loop were to complete normally. (The third article in the series shows how we can keep the loop from short-circuiting.)

2 Approaches

We will look at 2 approaches to getting the available exceptions.

  1. Using a Continuation
    With this approach, we look at the exceptions in a continuation instead of letting them bubble up through the ForEachAsync method/Task.

  2. Using ConfigureAwaitOptions.SuppressThrowing
    This uses a feature added in .NET 8: ConfigureAwaitOptions. With this approach, we suppress the exceptions in the loop and then use a "Wait" to show the aggregate exception. I came across this approach in Gérald Barré's article "Getting all exceptions thrown from Parallel.ForEachAsync".

The approaches both give access to the entire set of exceptions. The differences are subtle, and we'll look at those after we've seen both approaches.

A Reminder

As a reminder, here is how we left our code in the previous article (from the original-code/Program.cs file):

    try
    {
        await Parallel.ForEachAsync(
            Enumerable.Range(1, 100),
            new ParallelOptions() { MaxDegreeOfParallelism = 10 },
            async (i, _) =>
            {
                Console.WriteLine($"Processing item: {i}");
                await Task.Delay(10); // simulate async task
                MightThrowException(i);
                Interlocked.Increment(ref TotalProcessed);
            });
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception: {ex.Message}");
    }

Every 3rd iteration of the loop throws an exception. This causes the ForEachAsync loop to short-circuit (after 17 items, generally). Because of the "await" on ForEachAsync, only 1 of the exceptions is shown (out of 5, generally).

Using A Continuation

For the first approach, we will use a continuation. Instead of letting the exceptions bubble up through the ForEachAsync method call, we add a continuation that runs some code if there is an exception. From there, we can gracefully handle the exception(s).

Here is the code for that (from the "continuation/Program.cs" file):

    try
    {
        await Parallel.ForEachAsync(Enumerable.Range(1, 100),
            new ParallelOptions() { MaxDegreeOfParallelism = 10 },
            async (i, _) =>
            {
                Console.WriteLine($"Processing item: {i}");
                await Task.Delay(10); // simulate async task
                MightThrowException(i);
                Interlocked.Increment(ref TotalProcessed);
            })
            .ContinueWith(task =>
            {
                if (task.IsFaulted)
                    Console.WriteLine($"Exception: {task.Exception!.Message}");
            });
    }

After calling "Parallel.ForEachAsync", we add a continuation by calling ".ContinueWith". This specifies code that we want to run after a Task has completed.

The parameter of "ContinueWith" is a delegate that has the code we want to run when the Task is complete. The "task" parameter represents the ForEachAsync task itself. 

Because we have access to the this task, we can check the "IsFaulted" property. A task is faulted if an exception is thrown in the task (which is exactly what we are expecting here). If an exception is thrown, then we output it to the console.

Output

When we run the code, we get the following output:


Processing item: 9
Processing item: 4
Processing item: 1
Processing item: 6
Processing item: 5
Processing item: 8
Processing item: 3
Processing item: 10
Processing item: 2
Processing item: 7
Processing item: 11
Processing item: 16
Processing item: 12
Processing item: 13
Processing item: 14
Processing item: 15
Processing item: 17
Exception: One or more errors occurred. (Bad thing happened inside loop (6)) (Bad thing happened inside loop (3)) (Bad thing happened inside loop (9)) (Bad thing happened inside loop (15)) (Bad thing happened inside loop (12))

Total Processed: 12
Total Exceptions: 5
Done (AggregateException from Continuation)

This shows us the 17 items that were processed, and the Exception message "One or more errors occurred" along with additional information. This is a typical message from an AggregateException.

For more specifics on AggregateException, you can take a look at "Task and Await: Basic Exception Handling" and "'await Task.WhenAll' Shows One Exception - Here's How to See Them All".

An AggregateException contains all of the exceptions that were thrown in a Task. It has a tree structure of inner exceptions that let it handle various complexities of Task (including concurrent tasks and child tasks).

The Original Catch Block Does Nothing

The call to Parallel.ForEachAsync is inside a "try" block (just like in our original code). There is still a "catch" block, but it is not used in this scenario.

Here's the updated "catch" block:

    catch (Exception)
    {
        // You never get to this code. Exceptions are handled
        // inside the ForEachAsync loop.

        // But just in case, rethrow the exception
        Console.WriteLine("You shouldn't get here");
        throw;
    }

As the comment says, we should never get to this catch block. But just in case we do, we rethrow the exception. In this case, it would result in an unhandled exception and the application would crash.

But why won't we get to this "catch" block?
Our code has changed in a subtle way. In the original code, we used "await" on the ForEachAsync method/Task. When we "await" a faulted Task, the AggregateException is not thrown; instead, one of the inner exceptions is thrown. (This is what we saw in the previous article.)

The difference here is that we no longer "await" the ForEachAsync method. Instead, we "await" the continuation task returned from "ContinueWith". The code in our continuation is not likely to throw an exception, so we will not hit the "catch" block.

Displaying the Inner Exceptions

So we now have access to the AggregateException. Let's display the inner exceptions. Here is a method to do that (from the "continuation/Program.cs" file):

    private static void ShowAggregateException(AggregateException ex)
    {
        StringBuilder builder = new();

        var innerExceptions = ex.Flatten().InnerExceptions;
        builder.AppendLine("======================");
        builder.AppendLine($"Aggregate Exception: Count {innerExceptions.Count}");

        foreach (var inner in innerExceptions)
            builder.AppendLine($"Continuation Exception: {inner!.Message}");
        builder.Append("======================");

        Console.WriteLine(builder.ToString());
    }

This flattens the inner exceptions (to get rid of the tree structure), loops through each exception, and builds a string to display on the console.

We just need to adjust our continuation to call this new method:

    .ContinueWith(task =>
    {
        if (task.IsFaulted)
            ShowAggregateException(task.Exception);
    });

Now we get the following output:

Processing item: 9
Processing item: 2
Processing item: 6
Processing item: 1
Processing item: 3
Processing item: 8
Processing item: 7
Processing item: 4
Processing item: 10
Processing item: 5
Processing item: 11
Processing item: 12
Processing item: 16
Processing item: 14
Processing item: 15
Processing item: 13
Processing item: 17
======================
Aggregate Exception: Count 5
Continuation Exception: Bad thing happened inside loop (6)
Continuation Exception: Bad thing happened inside loop (3)
Continuation Exception: Bad thing happened inside loop (9)
Continuation Exception: Bad thing happened inside loop (12)
Continuation Exception: Bad thing happened inside loop (15)
======================

Total Processed: 12
Total Exceptions: 5
Done (AggregateException from Continuation)

Now we can see all 5 of the exceptions that were thrown in our loop. In this case, we are simply putting the exception message on the console. But we do have access to each of the full exceptions, including the stack traces. So we have the access to the same information as if we were to "catch" them.

This shows all of the exceptions that were thrown by using a continuation. Now let's look at another way to get the exceptions.

Using ConfigureAwaitOptions.SuppressThrowing

I came across this approach in Gérald Barré's article "Getting all exceptions thrown from Parallel.ForEachAsync". You can read his original article for more details.

This code is in the "configure-await-options" project in the code repository.

An Extension Method

This approach uses an extension method to suppress throwing exceptions on the awaited task (to keep the AggregateException from getting unwrapped). Then it waits on the Task to get the AggregateException directly.

Note: This uses a feature added in .NET 8: ConfigureAwaitOptions. So, this approach will not work for earlier versions of .NET.

Here is the extension method (from the "configure-await-options/Program.cs" file):

public static class Aggregate
{
    internal static async Task WithAggregateException(this Task task)
    {
        await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
        task.Wait();
    }
}

This extension method takes a Task as a parameter and returns a Task. So we can think of this as a decorator of sorts -- it modifies the functionality of a Task in a fairly transparent way to the caller.

At the heart of this is the "ConfigureAwait" call. "ConfigureAwait" got a new parameter in .NET 8: ConfigureAwaitOptions. If you want the details, I would recommend reading Stephen Cleary's article: ConfigureAwait in .NET 8.

In this code, "SuppressThrowing" will keep the Task from throwing an exception when it is awaited. If we "await" a task that has this option set, an exception will not be thrown at the "await".

The next line of the method calls "Wait" on the task. Generally, we want to avoid "Wait" because it is a blocking operation. But since we call "Wait" on a task that has already been awaited, we do not need to worry about this. The task is already complete.

But when we call "Wait" on a faulted task, the exception is thrown. And in this case, it is the full AggregateException (rather than the unwrapped inner exception that we normally get with "await").

Using the Extension Method

To use the extension method, we tack it onto the end of the Parallel.ForEachAsync call (also in the "configure-await-options/Program.cs" file):

    try
    {
        await Parallel.ForEachAsync(Enumerable.Range(1, 100),
            new ParallelOptions() { MaxDegreeOfParallelism = 10 },
            async (i, _) =>
            {
                Console.WriteLine($"Processing item: {i}");
                await Task.Delay(10); // simulate async task
                MightThrowException(i);
                Interlocked.Increment(ref TotalProcessed);
            }).WithAggregateException();
    }
    catch (AggregateException ex)
    {
        ShowAggregateException(ex);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception: {ex.Message}");
    }

Notice that "WithAggregateException" is called on the "ForEachAsync" task. This means that the Task that is awaited at the top is the Task that has been modified by the extension method.

So instead of throwing an individual exception, this throws an AggregateException.

You can see that we have an additional "catch" block for AggregateException, and this uses the same "ShowAggregateException" method from the other example.

Output

The output is similar to the other example:

Processing item: 10
Processing item: 7
Processing item: 9
Processing item: 2
Processing item: 4
Processing item: 6
Processing item: 8
Processing item: 5
Processing item: 1
Processing item: 3
Processing item: 11
Processing item: 15
Processing item: 12
Processing item: 13
Processing item: 14
Processing item: 16
Processing item: 17
======================
Aggregate Exception: Count 5
Continuation Exception: Bad thing happened inside loop (6)
Continuation Exception: Bad thing happened inside loop (9)
Continuation Exception: Bad thing happened inside loop (3)
Continuation Exception: Bad thing happened inside loop (12)
Continuation Exception: Bad thing happened inside loop (15)
======================

Total Processed: 12
Total Exceptions: 5
Done (AggregateException from ConfigureAwaitOptions)

Now we can see all 5 of the exceptions that were thrown in our loop. As with the first example, we have access to each of the full exceptions, including the stack traces.

Differences in the Approaches

These approaches both accomplish the same thing. They give us access to all of the exceptions that were thrown inside the ForEachAsync loop. Here are some things to keep in mind.

Availability

Using a Continuation is available back to .NET 6 (technically, it goes back further, but ForEachAsync only goes back to .NET 6, so that's really what we care about).

ConfigureAwaitOptions is new in .NET 8. So this will work great going forward, but does not work with prior versions of .NET.

Which approach you prefer depends on what version of .NET you use. If your code is .NET 8, then either approach will work.

Throwing Exceptions

Using a Continuation does not throw the AggregateException. Instead, the AggregateException is cracked open and examined in the continuation. This is also why we do not hit the "catch" block that we have in the code.
Note: With this approach, the outer AggregateException is never thrown, so we do not get the stack trace and some other elements that are filled in when an exception is thrown. All of the inner exceptions (the ones we care about here) *have* been thrown, so they do have the stack trace and other information.

ConfigureAwaitOptions does throw the AggregateException. When "task.Wait()" is called, the AggregateException is thrown. This is caught in the appropriate "catch" block where we can examine it.

Which approach you prefer will depend on your philosophy when it comes to exceptions and exception handling.

Throwing an exception is fairly expensive computationally. So some developers try to avoid throwing exceptions if processing can be handled another way.

On the other hand, some developers prefer to throw exceptions so that they can follow a consistent try/catch pattern throughout their code. This gives consistency and takes advantage of the chain-of-responsibility set up by the exception handling mechanism.

A Missing Feature

Both of these approaches give us access to all of the exceptions that were thrown. However, they do not address the short-circuiting problem. Even though our loop is set up to process 100 items, it stops processing after 17. As noted in the previous article, this is due to the way that Parallel.ForEachAsync is designed.

This missing feature may or may not be an issue, depending on what your code is trying to do.

Bottom Line

Both of these approaches accomplish our mission of getting all of the exceptions thrown in Parallel.ForEachAsync.

Wrap Up

Once we start diving into exception handling, we find that there are different options and different approaches. This includes addressing the default behavior of Parallel.ForEachAsync:

  1. If we "await" ForEachAsync, then we get a single exception (even if exceptions are thrown in multiple iterations of the loop).
  2. The loop short-circuits -- meaning not all items are processed.

    With both of the approaches shown in this article, we have solved for behavior #1: we get access to the AggregateException of the ForEachAsync task. With this information, we can find all of the errors that occurred in the loop.

    If this is all that we are concerned about, then we are good to go. But if we are concerned about item #2 (the loop short-circuiting), we need to look at another approach. And that is what we will do in the next article.

    Happy Coding!

    No comments:

    Post a Comment