Monday, February 26, 2024

Parallel.ForEachAsync and Exceptions

Parallel.ForEachAsync is a very useful tool for running code in parallel. But what happens when an exception is thrown inside the loop?

By default, the following things happen:

  1. If we "await" ForEachAsync, then we get a single exception (even if exceptions are thrown in multiple iterations of the loop).
  2. The loop short-circuits -- meaning not all items are processed.

Depending on what we are doing in the parallel loop, these items may not be a concern. But there are situations where I would like to get all of the exceptions back; and there are times when I would like to capture the exceptions and continue processing.

Over the next several articles, we will look at these issues and how to deal with them.

Code samples for all articles are available here: https://github.com/jeremybytes/foreachasync-exception.

This article shows the basics of using Parallel.ForEachAsync and what happens when exceptions are thrown.

For slides and code samples on Parallel.ForEachAsync (and other parallel approaches), you can take a look at the materials from my full-day workshop on asynchronous programming: https://github.com/jeremybytes/async-workshop-2022. (These materials use .NET 6.0. Updates for .NET 8.0 are coming in a few months.) For announcements on public workshops, check here: https://jeremybytes.blogspot.com/p/workshops.html.

Parallel.ForEachAsync Basics

The code for today's article is in the "original-code" folder of the GitHub repository. The early code is work-in progress; only the repository only contains the finished code at the end of the article.

We'll start by looking at the basics of Parallel.ForEachAsync. Here's a bit of code that sets up a non-parallel loop:


    static async Task Main(string[] args)
    {
        Console.Clear();

        foreach (int i in Enumerable.Range(1, 100))
        {
            Console.WriteLine($"Processing item: {i}");
            await Task.Delay(10); // simulate async task
        }

        Console.WriteLine("Done (Original Code)");
    }

This code uses a regular foreach loop to iterate from 1 to 100. Inside the loop, we output to the console and simulate some async work with "Task.Delay(10)". This will delay processing for 10 milliseconds. Since this code is running sequentially, it will take about 1 second for the entire loop to complete.

Here is what the output look like:

An animation of console output showing "Processing Item: 1" "Processing Item: 2" all the way to "Processing Item: 100". It takes about 1 second to complete the list. At the end "Done (Original Code)" is output.

Using Parallel.ForEachAsync

The next step is to change this to a parallel loop:


    static async Task Main(string[] args)
    {
        Console.Clear();

        await Parallel.ForEachAsync(
            Enumerable.Range(1, 100),
            async (i, _) =>
            {
                Console.WriteLine($"Processing item: {i}");
                await Task.Delay(10); // simulate async task
            });

        Console.WriteLine("Done (Original Code)");
    }

Here are a couple of notes on how this code works:

First, notice that we "await" the Parallel.ForEachAsync method. The loop runs asynchronously, so if we do not "await" here, then the Main method would keep going. Because of the "await", the last line (writing "Done" to the console) will not run until after all iterations of the loop are complete.

Next, let's look at the parameters for "ForEachAsync".

The first parameter (Enumerable.Range(1, 100)) is the IEnumerable to iterate through. This is the same as the "in" part of the non-parallel foreach loop.

The second parameter is a delegate that has the work we want to run in parallel.

Delegate Parameter
This delegate has 2 parameters (which we have as (i, _) here). The "i" parameter is the item in the current iteration of the loop. This is equivalent to the "i" in the foreach loop. We can use "i" inside the delegate body just like we can use "i" inside the body of the foreach loop. 

The second parameter of the delegate is a CancellationToken. Since we are not dealing with cancellation here, we use a discard "_" to represent this parameter.

The body of the delegate has the actual work. This is the same as the contents of the foreach loop above. We output a line to the console and then simulate some work with await Task.Delay(10).

Because we have "await" in the body of the delegate, the delegate itself is also marked with the "async" modifier (before the parameters).

Output
Because our code is now running in parallel, it completes much faster. Here is what the output looks like (it is too fast to see well):


The speed will depend on how many virtual cores are available to do the processing. Parallel.ForEachAsync normally figures out how many resources to use on its own. We'll add some hints to it later on so we can get more consistent results.

One thing to note about the output is that "100" prints out before "98". This is one of the joys of parallel programming -- order is non-deterministic.

Now let's move on to see what happens when one or more of these items throws an exception.

Throwing an Exception

Here's a method that sometimes throws an exception:


    private static void MightThrowException(int item)
    {
        if (item % 3 == 0)
        {
            Interlocked.Increment(ref TotalExceptions);
            throw new Exception($"Bad thing happened inside loop ({item})");
        }
    }

This will throw an exception for every 3rd item. (We could hook this up to a random number generator, but this gives us some predictability while we look at results.)

Interlocked.Increment
There may be a line of code that does not look familiar here:

    Interlocked.Increment(ref TotalExceptions);

In this case "TotalExceptions" is a static integer field at the top of our class. This lets us keep track of how many exceptions are thrown.

"Interlocked.Increment" is a thread-safe way to increment a shared integer. Using the "++" operator is not thread-safe, and may result in incorrect values.

Exceptions in the ForEachAsync Loop
Now we'll update the code to call "MightThrowException" inside our loop. Since we do not want an unhandled exception here, we will wrap the whole thing in a try/catch block:


    static async Task Main(string[] args)
    {
        Console.Clear();
        try
        {
            await Parallel.ForEachAsync(
                Enumerable.Range(1, 100),
                async (i, _) =>
                {
                    Console.WriteLine($"Processing item: {i}");
                    await Task.Delay(10); // simulate async task
                    MightThrowException(i);
                    Interlocked.Increment(ref TotalProcessed);
                });
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Exception: {ex.Message}");
        }

        Console.WriteLine($"\nTotal Processed: {TotalProcessed}");
        Console.WriteLine($"Total Exceptions: {TotalExceptions}");
        Console.WriteLine("Done (Original Code)");
    }

We've changed quite a few things.

First, we have wrapped the entire "ForEachAsync" call in a try/catch block. This is to make sure we do not have an unhandled exception.

Next, we have added the "MightThrowException" call inside of our loop. This will throw an exception for every 3rd item.

Next, we added "Interlocked.Increment(ref TotalProcessed);". This is after the point an exception might be thrown. So if an exception is not thrown, we increment a "TotalProcessed" field (similar to the "TotalExceptions" field). This will give us a count of the items that were processed successfully.

In the "catch" block, we output the exception message.

Finally, we have console output for the total number of items processed successfully and the total number of exceptions.

Output
Here is the output for this code (note: this output is not animated):


Processing item: 15
Processing item: 16
Processing item: 17
Processing item: 26
Processing item: 18
Processing item: 19
Processing item: 20
Processing item: 21
Processing item: 22
Processing item: 23
Processing item: 24
Processing item: 25
Processing item: 27
Exception: Bad thing happened inside loop (6)

Total Processed: 18
Total Exceptions: 9
Done (Original Code)

This is just the last part of the output, but it tells us enough of about what is happening.

The Issues

The output shows us the 2 issues from the start of this article. These may or may not concern us, depending on what we are doing. But we do need to be aware of them.

Hidden Exceptions

1. If we "await" ForEachAsync, then we get a single exception (even if exceptions are thrown in multiple iterations of the loop).

When an exception is thrown in a Task, it gets wrapped in an AggregateException. This is because Tasks can be complex (with concurrent and child tasks). An AggregateException wraps up all of the exceptions that happen into a single exception.

But when we "await" a Task, the AggregateException gets unwrapped for us. This can be good because we now have a "real" exception and do not have to deal with an AggregateException. But it can be bad because it hides the number of exceptions that actually occur.

Since we "await" the ForEachAsync method, we only see one exception: "Exception: Bad thing happened inside loop (6)". So this is only showing the exception for item #6.

But we can see in the "Total Exceptions" that 9 exceptions were thrown. The other exceptions are hidden from us here.

Short-Circuited Loop

2. The loop short-circuits -- meaning not all items are processed.

The other thing to notice about the output is that the loop stops processing part way through. Only 27 of the 100 iterations of the loop ran. This is the nature of the ForEachAsync method. If a task throws an exception, the loop stops processing.

Depending on our scenario, we may want the loop to continue even if one of the iterations throws an exception.

We deal with both of these items in the next 2 articles.

A Little Consistency

Before leaving this code, let's add a little bit of consistency.

One of the problems with parallel code is that the decision of how many items to run at the same time is left up to the parallel infrastructure. If we have a lot of resources available, then there will be more items run in parallel.

But this also means that output will vary depending on what machine we are running on (and how many resources that machine has at the time). In this code, my desktop and laptop produce different results. The desktop generally stops after 27 items, the laptop will stop after 17 (sometimes fewer, depending on what else is going on).

Parallel.ForEachAsync has an optional parameter where we can set the maximum degrees of parallelism. This will limit the number of items run concurrently, and if we set this to a value lower than our machine resources, will also add some consistency to the output.

Here is our loop with the additional parameter. (This is the final state of our "original-code" project and can be found in the original-code/Program.cs file.)


    await Parallel.ForEachAsync(
        Enumerable.Range(1, 100),
        new ParallelOptions() { MaxDegreeOfParallelism = 10 },
        async (i, _) =>
        {
            Console.WriteLine($"Processing item: {i}");
            await Task.Delay(10); // simulate async task
            MightThrowException(i);
            Interlocked.Increment(ref TotalProcessed);
        });

This second parameter is a ParallelOptions object that sets the MaxDegreesOfParallelism property to 10. This means that a maximum of 10 items run concurrently. (It may be fewer items if there are not enough resources available.)

This gives me a consistency between my mid-range machines.


Processing item: 6
Processing item: 9
Processing item: 5
Processing item: 1
Processing item: 3
Processing item: 8
Processing item: 11
Processing item: 16
Processing item: 12
Processing item: 13
Processing item: 14
Processing item: 15
Processing item: 17
Exception: Bad thing happened inside loop (9)

Total Processed: 12
Total Exceptions: 5
Done (Original Code)

Now I get a fairly consistent 17 items processed. I want the consistency here so that we can more readily compare results when we look at different ways of handling issues.

Wrap Up

So to recap, here are 2 things that we need to be aware of when we use Parallel.ForEachAsync:

  1. If we "await" ForEachAsync, then we get a single exception (even if exceptions are thrown in multiple iterations of the loop).
  2. The loop short-circuits -- meaning not all items are processed.

This may be fine for the work that we are doing, but we may want to go beyond that. We will tackle the first item in the next article. We'll look at 2 different ways to get all of the exceptions from ForEachAsync, and why we might choose one rather than the other.

In the 3rd article, we will tackle the issue of short-circuiting. So be sure to check back.

Happy Coding!

No comments:

Post a Comment