Friday, October 6, 2023

Producer/Consumer Exception Handling - A More Reliable Approach

In the last 2 articles, we looked at a couple of ways to deal with exceptions that can happen when using the Producer/Consumer pattern along with Channel<T> in C#. (Prior articles: "Don't Use 'Task.WhenAll' for Interdependent Tasks" and "Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels".)

Both of these approaches are short-circuiting, meaning that if the Consumer fails on one item, no further items are processed. The same is true of the Producer: if the Producer fails on one item, no further items are produced. This approach may work fine for particular applications. But what if we want something more robust?
By handling exceptions inside the Producer and Consumer, errors can be logged and processing can continue.
We can go back and look at the logged items to determine which items need to be redone. This is a more robust approach to exception handling for our sample application.

Motivation

This article is based on a question sent in by Edington Watt regarding a presentation that I did about Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed strange behavior when exceptions are thrown, and he asked the question "What are the best exception handling practices in the Consumer Producer approach using Channel<T>?"

Today we'll look at handling exceptions in a way that does not short-circuit the process.

Note: The full source code (including the original sample code and various approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.

Articles

Interdependent Tasks

We'll go back to the starting place we used in the previous article. Here are the relevant methods (available in the Program.cs file of the "refactored" project):


    static async Task Main(string[] args)
    {
        try
        {
            await ProducerConsumerWithExceptions();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

        Console.WriteLine("Done");
    }

    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

I won't go through this code again here. Check the prior article for a walkthrough. This code works fine for the success state, but it has strange behavior if the Consumer throws an exception.

Consumer Exception

If the Consumer throws an exception while processing an item, the entire application hangs:


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11

Here's a reminder of what happens:

In the "ProducerConsumerWithExceptions" method, we create a Bounded Channel. This means that the capacity of the channel is limited to 10 items. When the channel has reached that capacity, no new items can be written to it until space is made available for them.

The Consumer reads one item off of the channel (index 0) and then throws an exception. The Consumer then stops, and no further items are read off of the channel.

The Producer produces data and puts it onto the channel. The first items produced (index 0) is pulled off of the channel by the Consumer. The Producer keeps going and puts 10 more items onto the channel (indexes 1 through 10).

Then the Producer produces index 11 and tries to put it onto the channel. At this point, the channel is at capacity (it has 10 items already), so the Producer waits (in an async-friendly way) for there to be space to write the next item.

And this is where the application hangs. The Producer will wait forever. Since "Task.WhenAll" is waiting for the Producer and Consumer to both finish, it will also wait forever. The application will not exit on its own.

No Handling of Exceptions in the Producer/Consumer

One cause of the behavior is that the Producer and Consumer do not handle their own exceptions. They let them bubble up and (hopefully) get caught in the Main method's try/catch block.

Consumer

Here is the code for the Consumer (in the "Program.cs" file in the "refactored" project):


    static async Task Consumer(ChannelReader<int> reader)
    {
        await foreach (var item in reader.ReadAllAsync())
        {
            Console.WriteLine($"Consuming object: {item}");
            MightThrowExceptionForConsumer(item);
        }
    }

This uses the "ReadAllAsync" method on the "ChannelWriter<T>" class to read the items off of the Channel as they become available.

The call to "MightThrowExceptionForConsumer" throws an exception. (Although it has "might" in the name, it is hard-coded to always throw an exception here.) And because the exception is unhandled, the "foreach" loop will exit.

This relies on the exception eventually making its way back to the "Main" method where it will be caught and logged in the catch block.

Producer

The Producer code is similar (in the same "Program.cs" file in the "refactored" project):


    static async Task Producer(ChannelWriter<int> writer)
    {
        try
        {
            for (int i = 0; i < 20; i++)
            {
                Console.WriteLine($"Producing something: {i}");
                MightThrowExceptionForProducer(i);
                await Task.Delay(10);
                await writer.WriteAsync(i);
            }
        }
        finally
        {
            writer.Complete();
        }
    }

This uses a "for" loop to generate 20 items and write them to the Channel using "WriteAsync". This could also throw an exception in the "MightThrowExceptionForProducer" method (although for this sample, it is hard-coded to not throw an exception).

If an exception is thrown in the "for" loop, the loop will exit before finishing. And although we do not have a "catch" block to catch an exception, we do have a "finally" block.

In the "finally" block we mark the ChannelWriter as "Complete". This lets the Consumer know that no new items will be added to the Channel, and the Consumer can stop waiting for new items. This is in a "finally" block because even if we get an exception, we want to make sure that the ChannelWriter is marked "Complete".

But as with the Consumer, the Producer relies on the exception bubbling up to the "Main" method where it should be logged in the "catch" block.

Adding Exception Handling to the Producer and Consumer

A more reliable way of dealing with exceptions is to handle them inside the Producer and the Consumer. This also has the effect of removing the dependency between the Producer and Consumer that we saw earlier. With the dependency removed, we can use a Bounded Channel if we need to, and we can also use "Task.WhenAll" to wait for the Producer and Consumer tasks to complete.

The code for this section is available in the "doesnt-stop" project.

Consumer

Here is the code for the updated Consumer (in the "Program.cs" file in the "doesnt-stop" project):

    static async Task Consumer(ChannelReader<int> reader)
    {
        await foreach (var item in reader.ReadAllAsync())
        {
            try
            {
                Console.WriteLine($"Consuming object: {item}");
                MightThrowExceptionForConsumer(item);
                TotalConsumed++;
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Logged: {ex.Message}");
            }
        }
    }

Here we have added a try/catch block inside of the foreach loop. This means that if an exception is thrown, it will be caught and logged here, and the loop will continue to the next item.

The Consumer is no longer short-circuiting. So we do no longer need to worry about the Consumer stopping before it has read all of the items from the Channel.

One other new item is "TotalConsumed++". This is a static variable that is updated with the total number of items successfully processed (it is after the "MightThrow..." method). This lets us track how many items were successful in the Consumer.

Producer

We have something very similar in the Producer (in the "Program.cs" file in the "doesnt-stop" project):

    static async Task Producer(ChannelWriter<int> writer)
    {
        for (int i = 0; i < 100; i++)
        {
            try
            {
                Console.WriteLine($"Producing something: {i}");
                MightThrowExceptionForProducer(i);
                await Task.Delay(10);
                TotalProduced++;
                await writer.WriteAsync(i);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Logged: {ex.Message}");
            }
        }

        writer.Complete();
    }

Inside the "for" loop, we have a try/catch block. So if an exception is thrown, it is caught and logged, and the loop processing will continue.

Marking the ChannelWriter "Complete" no longer needs to be in a "finally" block. We will not have any unhandled exceptions escape here (and if we did get one, it would probably be an unrecoverable one).

One other new item is "TotalProduced++". This is a static variable that is updated with the total number of items successfully produced (it is after the "MightThrow..." method). This lets us track how many items were successful in the Producer.

Throwing Exceptions

To make the output more interesting, the methods that might throw exceptions use a random number generator to see if an exception should be thrown (in the "Program.cs" file in the "doesnt-stop" project):

    private static void MightThrowExceptionForProducer(int item)
    {
        if (Randomizer.Next() % 3 == 0)
            throw new Exception($"Bad thing happened in Producer ({item})");
    }

    private static void MightThrowExceptionForConsumer(int item)
    {
        if (Randomizer.Next() % 50 == 0)
            throw new Exception($"Bad thing happened in Consumer ({item})");
    }
The Producer throws an exception 1 out of 3 times (approximately). The Consumer throws an exception 1 out of 50 times (approximately).

Program Output

So lets see how this program behaves. We only modified the Producer and Consumer code. The original "Main" and "ProducerConsumerWithExceptions" are unchanged. To give us more of a chance to get random exceptions, I also changed the code so that it produces 100 items (instead of 20).

Consuming object: 90
Logged: Bad thing happened in Producer (91)
Producing something: 92
Producing something: 93
Consuming object: 92
Producing something: 94
Consuming object: 93
Logged: Bad thing happened in Producer (94)
Producing something: 95
Producing something: 96
Consuming object: 95
Producing something: 97
Consuming object: 96
Producing something: 98
Consuming object: 97
Logged: Bad thing happened in Consumer (97)
Producing something: 99
Consuming object: 98
Consuming object: 99
Total Produced: 69
Total Consumed: 66
Done
This is just the tail end of the output. It shows several logged items (2 Producer errors and 1 Consumer error).

Notice that the Total Produced (69) and Total Consumed (66) do not match. This is because the Producer only produced 69 items (of the 100), so the Consumer had a chance to process at most 69 items. The Consumer also generated a few errors (3), so it is showing 3 fewer items.

"Done" is printed out at the bottom. This lets us know that the application ran to completion and exited.

A More Reliable Approach

Our application is more reliable when the Producer and Consumer can deal with their own exceptions. This is not always possible, but it is a good approach for the sample code that we have here.

But we always need to consider what our particular program is doing. I had a great question come up in a recent workshop. The developer asked, 
"If we get an error in the Consumer, can we just put the item back onto the Channel for it to try again?"
If the application has random hiccups that can be solved by re-processing, then maybe this is a good solution. But there is a problem: what if it isn't a random hiccup?

If the Consumer fails every time it tries to process a particular record, then putting it back on the Channel will only create an endless loop of failures.

One approach that we came up with was to have a separate Channel for errors. So if the Consumer failed to process an item, it would put it on the error channel. This would be processed in a different workflow. For example, the Consumer of the error channel could try to process the item again and then log an error if it was unsuccessful. This would give the application a chance to try again (to handle the random hiccups) and also to log errors for items that could not be processed at all.

Wrap Up

As with most code, there are a lot of options. When I'm writing code, I usually spend a lot more time thinking than typing. Working through different scenarios lets me come up with a solution at a high level and figure out how all of the pieces fit together. Typing the code is the easy part.

Happy Coding!

Thursday, October 5, 2023

Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels

In the last article, "Don't Use 'Task.WhenAll' for Interdependent Tasks", we saw how using WhenAll for tasks that are interdependent can cause an application to hang if one of the tasks throws an exception. As a solution, we changed from using "Task.WhenAll" to awaiting the tasks separately. But there are other approaches.

In this article, we will focus on breaking the dependency between the tasks. In the sample code, the channel is bounded (meaning, it has a maximum capacity). This can create a dependency between the Producer and Consumer. One way to break the dependency is to use an unbounded channel.

A Bounded Channel can create a dependency between a Producer and Consumer. Using an Unbounded Channel is one way to break that dependency.

Let's do a quick review of the code that we saw yesterday and then look at how changing the channel can affect the code.

Motivation

This article is based on a question sent in by Edington Watt regarding a presentation that I did about Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed strange behavior when exceptions are thrown, and he asked the question "What are the best exception handling practices in the Consumer Producer approach using Channel<T>?"

One reason for the behavior was the way that a bounded channel created a dependency between the Producer and Consumer. If we can break that dependency, the behavior will change.

Note: The full source code (including the original sample code and various approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.

Articles

Interdependent Tasks

We'll go back to the starting place we used in the previous article. Here are the relevant methods (available in the Program.cs file of the "refactored" project):


    static async Task Main(string[] args)
    {
        try
        {
            await ProducerConsumerWithExceptions();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

        Console.WriteLine("Done");
    }

    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

I won't go through this code again here. Check the previous article for a walkthrough. This code works fine for the success state, but it has strange behavior if the Consumer throws an exception.

Consumer Exception

If the Consumer throws an exception while processing an item, the entire application hangs:


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11

Here's a reminder of what happens:

In the "ProducerConsumerWithExceptions" method, we create a Bounded Channel. This means that the capacity of the channel is limited to 10 items. When the channel has reached that capacity, no new items can be written to it until space is made available for them.

The Consumer reads one item off of the channel (index 0) and then throws an exception. The Consumer then stops, and no further items are read off of the channel.

The Producer produces data and puts it onto the channel. The first items produced (index 0) is pulled off of the channel by the Consumer. The Producer keeps going and puts 10 more items onto the channel (indexes 1 through 10).

Then the Producer produces index 11 and tries to put it onto the channel. At this point, the channel is at capacity (it has 10 items already), so the Producer waits (in an async-friendly way) for there to be space to write the next item.

And this is where the application hangs. The Producer will wait forever. Since "Task.WhenAll" is waiting for the Producer and Consumer to both finish, it will also wait forever. The application will not exit on its own.

Using an Unbounded Channel

What I would like to do is remove the dependency between the Producer and Consumer. Even if the Consumer breaks, the Producer should keep going. One way of doing this is to use an Unbounded Channel (meaning, a channel with no capacity limit).

This is a small change to this application in the ProducerConsumerWithExceptions method. (This code is in the Program.cs file of the "unbounded-channel" project):


    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateUnbounded<int>();

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

Now the Channel is no longer limited to 10 items.

Error Output

Let's take a look at the error output now:


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11
Producing something: 12
Producing something: 13
Producing something: 14
Producing something: 15
Producing something: 16
Producing something: 17
Producing something: 18
Producing something: 19
Bad thing happened in Consumer (0)
Done

The most important part of this output is that we have the "Done" message. This tells us that the application finished on its own. We also see the error message "Bad thing happened in Consumer (0)" just before the "Done".

Here's how things work with this code.

In the "ProducerConsumerWithExceptions" method, we create a Unbounded Channel. This means that the capacity of the channel is not limited.

The Consumer reads one item off of the channel (index 0) and then throws an exception. The Consumer then stops, and no further items are read off of the channel.

The Producer produces data and puts it onto the channel. The first items produced (index 0) is pulled off of the channel by the Consumer. The Producer keeps going and puts 19 more items onto the channel (indexes 1 through 19). Once it has produced all 20 items, the Producer completes.

The Producer Task has completed (successfully) and the Consumer Task has completed (with error). This means that "await Task.WhenAll" can move foward.

Since one of the tasks got an exception, the "await" will raise that exception here. But "ProducerConsumerWithException" does not handle the exception, so it bubbles up to the "Main" method.

The "Main" method has a try/catch block. It catches the thrown exception, prints the error message to the console, outputs "Done", and then the application exits.

Bounded vs. Unbounded Channels

What we've seen here is that using a Bounded Channel can create a dependency between a Producer and Consumer. If the Consumer fails and the Channel reaches capacity, the Producer will never complete.

With an Unbounded Channel, we no longer have that same dependency.

I say that a Bounded Channel "can" create a dependency because it doesn't necessarily create a dependency. There are different ways to write to a Channel that would not create a dependency.

Writing to a Channel

There are multiple ways of writing to a channel. In this code, the Producer uses "WriteAsync":
    await writer.WriteAsync(i);

"WriteAsync" is a method on the ChannelWriter<T> class. If the channel is at capacity, it will wait until there is space available. This is the easiest way to write to a channel, so I tend to use this most of the time. It does have the downside of possibly waiting forever if space is never made available.

But there are other approaches. The ChannelWriter<T> also has a "TryWrite" method:
    bool success = writer.TryWrite(i);

"TryWrite" is not asynchronous. If it writes to the channel successfully, it returns "true". But if the channel is at capacity and the data cannot be written, it returns "false".

This has the advantage of not waiting forever. The disadvantage is that we need a little more code to handle the workflow. What happens if "TryWrite" returns "false"? Do we wait a bit and retry? Do we pause our Producer? Do we simply discard that item? Do we log the item and go to the next one? It depends on what our application needs to do. But the flexibility is there for us to decide.

To help us with the workflow, there is also a "WaitToWriteAsync" method:
    await writer.WaitToWriteAsync();

If the channel is at capacity, this will wait (in an async-friendly way) for space to be available. We can use this to be pretty sure that "TryWrite" will succeed. (But with parallel code, it might not always be the case.)

When we await "WaitToWriteAsync", however, we are back to a situation where we may end up waiting forever if the channel is at capacity and no new items are ready off of it.

Cancellation Tokens

Another part of the async methods on ChannelWriter<T> is that they take optional cancellation tokens.
    await writer.WriteAsync(i, cancellationToken);

We could be pretty creative with a cancellation token. For example, if the Consumer throws an exception, it could also set the cancellation token to the "cancellation requested" state. Then "WriteAsync" would stop waiting.

The same is true if we use a cancellation token with "WaitToWriteAsync".

A Lot of Options

As with many things in C#, Channel<T> has a lot of options. This can be good because of the flexibility that we have as programmers. If we need to do something a little unusual, there are often methods and properties that help us out. The downside is that there is a bit more to learn, and it can be confusing to figure out which approach is the best one for your application.

More Solutions

So we've seen a couple of solutions to fix the weird behavior of the sample application when exceptions are thrown.

If we await the Producer and Consumer tasks separately, we do not need to be as concerned about them being dependent on one another.

If we change the Channel from Bounded to Unbounded, we remove the dependency between the Producer and Consumer.

If we add some exception handling inside the Producer and Consumer Tasks, we can continue processing data even if there are errors. (This approach is shown in the "doesnt-stop" project in the GitHub repository: https://github.com/jeremybytes/channel-exceptions.) We'll look at this in a future article.

Wrap Up

As we dive deeper into different ways of using asynchronous and parallel bits in our code, we often find strange behavior. And sometimes it can take a while to figure out where the strangeness is coming from. But the more that we learn, the easier it gets. So keep learning and keep writing amazing applications that make your users happy.

Happy Coding!

Wednesday, October 4, 2023

Don't Use "Task.WhenAll" for Interdependent Tasks

Using "Task.WhenAll" gives us a chance to pause our code until multiple tasks are complete. In addition, if there is an unhandled exception in one of those tasks, awaiting the WhenAll will raise that thrown exception so we can deal with it. This all works fine if our tasks are not dependent on each other. But if the tasks are interdependent, this can cause the application to hang.

Short Version: 
"Task.WhenAll" works great for independent tasks. If the tasks are dependent on one another, it can be better to await the individual tasks instead.
When we start using tasks and parallel code in our applications, we can run into some weirdness that can be difficult to debug. Today we'll look at some of the weirdness that can occur around "Task.WhenAll" and understand how to alleviate it.

Motivation

This article is based on a question sent in by Edington Watt regarding a presentation that I did about Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed strange behavior when exceptions are thrown, and he asked the question "What are the best exception handling practices in the Consumer Producer approach using Channel<T>?"

One reason for the behavior was the way that "await Task.WhenAll" was used in the code. We will see some of that code and a solution to it after a bit of an overview of how I generally use "Task.WhenAll".

Note: The full source code (including the original sample code and various approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.

Articles

Awaiting Non-Dependent Tasks

I use "Task.WhenAll" in a couple of scenarios. The first is when I want to run several of the same tasks in parallel, and I need to wait for all of them to complete.

Parallel Tasks

Here is some code from my workshop on "Asynchronous and Parallel Programming in C#". (Alert: The last scheduled public workshop on this topic is coming up soon: https://jeremybytes.blogspot.com/2023/09/last-chance-full-day-workshop-on.html.)


    static async Task RunWithContinuation(List<int> ids)
    {
        List<Task> allContinuations = new();

        foreach (var id in ids)
        {
            Task<Person> personTask = reader.GetPersonAsync(id);
            Task continuation = personTask.ContinueWith(task => ...);
            allContinuations.Add(continuation);
        }

        await Task.WhenAll(allContinuations);
    }

Inside the "foreach" loop, this code runs a method (GetPersonAsync) that returns a Task, and then sets up a continuation to run when that first task is complete. We do not "await" the tasks inside the loop because we want to run them in parallel.

Each continuation also returns a Task (which is named "continuation"). The trick to this method is that even though I want all of these tasks to run in parallel, I do not want to exit the method until they are all complete.

This is where "Task.WhenAll" comes in.

The continuation tasks are collected in a list (called "allContinuations"). Then I pass that collection to "Task.WhenAll". When I "await" Task.WhenAll, the code will pause (in a nice async-friendly way) until all of the continuation tasks are complete. This means that the method ("RunWithContinuation") will not exit until all of the tasks have completed.

These tasks are not dependent on each other. Each is its own atomic task: it gets a "Person" record based on an "id" parameter and then uses the result of that. (The body of the continuation is hidden, but it takes the "Person" record and outputs it for the user.)

If any of these continuation tasks throws an exception, then that exception is raised when we "await Task.WhenAll".

But here is one key point:
The exception is not raised until after all of the tasks have completed.
And here "completed" means that it finished successfully, with an error, or by being canceled.

Since these tasks are not dependent on each other, they can all complete even if one (or more) of the tasks fails.

Related (but not Dependent) Tasks

As another example, here is some code from a hands-on lab (from the same workshop mentioned above). This particular lab is about dealing with AggregateExceptions.


    await Task.WhenAll(orderDetailsTask, customerTask, productTask);

In this code, we have 3 separate tasks: one to get order details, one to get a customer, and one to get a list of products. These 3 pieces are used to assemble a complete "Order". (Please don't write to me about how this isn't a good way to assemble a complex object; it is code specifically to allow for digging into AggregateException.)

Similar to above, the "await" will cause this code to pause until all 3 of the tasks have completed. If there is an exception in any of the tasks, it will be raised here. As noted above, the exception will only be raised after all 3 tasks are complete.

These tasks are related to each other, but they are not dependent on each other. So one task failing will not stop the other tasks from completing.

Interdependent Tasks

Now that we have a bit of an introduction to how "Task.WhenAll" behaves, let's look at a situation where the tasks are dependent on each other. As we'll see, this can cause us some problems.

I refactored the code originally provided by Edington Watt and the WTW ICT Technology team to show different behaviors in the code. You can get the original code (and the various solutions) here: https://github.com/jeremybytes/channel-exceptions.

Starting Code

We'll start at the root of the application. (The code can be found in the "Program.cs" file of the "refactored" project.)


    static async Task Main(string[] args)
    {
        try
        {
            await ProducerConsumerWithExceptions();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

        Console.WriteLine("Done");
    }

When we "await" a Task that throws an exception, that exception is raised in our code. (If we do not await the Task, then we have to go looking for the exception in the Task properties.)

The idea behind this code is that if an exception is thrown in the main "ProducerConsumerWithExceptions" method (that returns a Task), we can catch it and output a message to the console.

Before looking at the behavior, lets see what "ProducerConsumerWithExceptions" does.


    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

This code uses the Producer/Consumer pattern with a Channel in the middle. The idea is that the Producer can produce data in parallel and then put the data onto the Channel. The Consumer reads data off of the Channel and does something with it (in this case, it outputs it to the console).

This code has an "await Task.WhenAll". So this method will not exit until both of these tasks (the produder and the consumer) have completed. And if either of the tasks throws an exception, it will be raised here. Since we are not handling the exception in this method, it will bubble up and be handled in the "Main" method above.

Success State
If both the producer and consumer complete successfully, we get output similar to the following:


...
Producing something: 12
Consuming object: 11
Producing something: 13
Consuming object: 12
Producing something: 14
Consuming object: 13
Producing something: 15
Consuming object: 14
Producing something: 16
Consuming object: 15
Producing something: 17
Consuming object: 16
Producing something: 18
Consuming object: 17
Producing something: 19
Consuming object: 18
Consuming object: 19
Done
This is is just showing the end part of the output, but we can see that the producer is producing objects and the consumer is consuming objects. Once 20 items have been produced (starting with index 0), the application prints "Done" and exits.

Error Behavior
Things get strange when we get an exception. Here is the output when the Consumer throws an exception on reading the first record (index 0):


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11

From the output, the Producer produced 12 items (and then stopped). The Consumer throws an exception while reading the first record, so we only see output for 1 item here.

At first it seems strange that we do not see the exception in the output; after all, that is part of the try/catch block in the Main method.

But here's the real problem: The application is still running!

Interdependent Tasks
The application is stuck on awaiting Task.WhenAll.


    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

The code is hung because the Producer and Consumer are interdependent. The reason for this is a bit subtle if you're not familiar with how Channel<T> works in C#.

On the first line of this method, we create a bounded channel. A bounded channel is limited to holding a certain number of items. In this case, the channel is limited to holding 10 items. Once the channel has reached capacity, no new items can be written until one has been read off.

So here's what is happening. The Producer starts producing data (with index 0) and putting it onto the channel. The consumer reads the first item (index 0) off of the channel and then throws an exception. The Consumer task is now faulted (meaning, an exception was thrown in the Task). The Consumer tasks is now "complete".

But the Producer keeps working. It keeps producing records until the channel reaches capacity (holding indexes 1 through 10). It then produces index 11 and then pauses. The code to write to the channel is waiting (in an async-friendly way) for there to be space in the channel. But the channel is at capacity. And since the Consumer is faulted, no more items will be read off of the channel. So the Producer is left waiting forever.

Since the Producer does not complete, the "await Task.WhenAll" will also wait forever. Because of this, the exception is never raised in this part of the code, so the "Main" try/catch block never has a chance to handle it. The application will never complete on its own.

One Solution - "await" Tasks Separately

One solution to this particular problem is to "await" the Producer and Consumer tasks separately. Here is what that code looks like.  (The code can be found in the "Program.cs" file of the "separate-await" project.)

    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

	await consumer;
        await producer;
    }

Notice that instead of using "await Task.WhenAll", we now have separate "await consumer" and "await producer" sections. Since our producer is dependent upon the consumer making space available in the channel, it is best to wait for the consumer first here. (It's okay if the subtleties of that get lost; parallel async code is "fun".)

Here is the output from the code above:

Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Bad thing happened in Consumer (0)
Producing something: 4
Done

The output shows the Producer producing items (indexes 0 through 4 in this case). The Consumer pulls the first item (index 0) from the channel and then throws an exception.

The line "await consumer" above waits for the Consumer to complete. As soon as the Consumer task throws an exception, it is complete (in the "faulted" state). And since we awaited the faulted task, the exception gets raised.

The exception is not handled in this method, so the method short-circuits (meaning we never get to the "await producer" line). The exception bubbles up to the "Main" method, and that try/catch block outputs the exception message to the console ("Bad thing happened in Consumer (0)").

Most importantly, we see the "Done" message which means that our application exited on its own.

Side note: We get the message "Producing something: 4" after the exception message because the Producer is still running its code concurrently. So, the Producer has a chance to produce one more item before the application exits.

Task.WhenAll and Interdependent Tasks

So we've seen that if we have interdependent tasks, awaiting Task.WhenAll can lead our program to hang. In this situation, one task failing (the consumer) prevented the other task from completing (the producer). Since not all tasks complete, "await Task.WhenAll" ends up waiting forever.
If we have interdependent tasks, it is better to await the tasks separately. If we have tasks which are not dependent (such as the parallel and related examples above), then "await Task.WhenAll" usually works just fine.

More Solutions

There are other ways to fix the strange behavior of the sample application. You can take a look at some of the other solutions in the GitHub repository: https://github.com/jeremybytes/channel-exceptions.

Another solution is to change the bounded channel to an unbounded channel (in the "unbounded-channel" project). This gets rid of the capacity limitation and removes the dependency between the Producer task and Consumer task. [Update - article available: Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels]

Both this solution and the one presented in this article are short-circuiting -- meaning, if the Consumer throws an exception on one item, no other items will be consumed. The same is true of the Producer.

The GitHub repository also provides a more robust implementation (in the "doesnt-stop" project). In this project, if the Consumer throws an exception while processing an item, it will log the error and continue processing. (The same is true of the Producer in this project.)

Wrap Up

As we dive deeper into different ways of using asynchronous and parallel bits in our code, we often find strange behavior. And sometimes it can take a while to figure out where the strangeness is coming from. But the more that we learn, the easier it gets. So keep learning and keep writing amazing applications that make your users happy.

Happy Coding!

Monday, September 25, 2023

Last Chance: Full Day Workshop on Asynchronous and Parallel Programming in C#

This is the last public workshop I have scheduled on asynchronous programming. Next year, I've got a whole new workshop coming. So if you've been putting off attending, you'd better take the opportunity now!

On Sunday, November 12, 2023, I'll be giving a full day workshop (with hands-on labs) at LIVE! 360 in Orlando, Florida. This is your chance to spend a full day with me and also learn tons of useful tech during the rest of the week.

Event Info:

LIVE! 360 November 12-17, 2023
Royal Pacific Resort at Universal
Orlando, FL
Event Link: https://live360events.com/Events/Orlando-2023/Home.aspx

Use the promo code "Clark" to save $500 off the regular price for 4-, 5-, or 6-day packages (Note: you'll need the 6-day package to join me for the full day workshop on Sunday, November 12). Here's a direct link to registration that includes the promo code: bit.ly/3LuBLrd

Read on to see what we'll learn in the workshop.

Hands-on Lab: Asynchronous and Parallel Programming in C#

11/12/2023 9:00 a.m. - 6:00 p.m.
Level: Intermediate

Asynchronous programming is a critical skill to take full advantage of today's multi-core systems. But async programming brings its own set of issues. In this workshop, we'll work through some of those issues and get comfortable using various parts of the .NET Task Parallel Library (TPL).

We'll start by calling asynchronous methods using the Task Asynchronous Pattern (TAP), including how to handle exceptions and cancellation. With this in hand, we'll look at creating our own asynchronous methods and methods that use asynchronous libraries. Along the way, we'll see how to avoid deadlocks, how to isolate our code for easier async, and why it's important to stay away from "asyc void".

In addition, we'll look at some patterns for running code in parallel, including using Parallel.ForEachAsync, channels, and other techniques. We'll see pros and cons so that we can pick the right pattern for a particular problem.

Throughout the day, we'll go hands-on with lab exercises to put these skills into practice.

Objectives:

  • Use asynchronous methods with Task and await 
  • Create asynchronous methods and libraries 
  • Learn to avoid deadlocks and other pitfalls 
  • Understand different parallel programming techniques

Topics:

Here's a list of some of the topics that we'll cover:

Pre-Requisites:

Basic understanding of C# and object-oriented programming (classes, inheritance, methods, and properties). No prior experience with asynchronous programming is necessary; we'll take care of that as we go.

Attendee Requirements:

  • You must provide your own laptop computer (Windows, Mac, or Linux) for this hands-on lab.
  • All other laptop requirements will be provided to attendees 2 weeks prior to the conference

Hope to See You There!

This is the last scheduled public asynchronous programming workshop. If you can't make it to this one, I am available for private workshops for your team - customized to be most relevant to the code that you're building. Next year, I've got a whole new workshop coming, so keep watching here (and my website: jeremybytes.com) for future events.

Happy Coding!

Wednesday, August 23, 2023

New Video: 'await' Return Types

A new video is available on my YouTube channel: Why do you have to return a Task when you use "await" in a C# method?. The video is a quick walkthrough of code based on an article from earlier this week (link below).

Whenever we "await" something in a C# method, the return value is automatically wrapped in a Task, and the return type for the method must include the Task as well. This leads to some strange looking code: the code in the method returns one thing (such as a Person object), but the return type for the method returns another (a Task of Person). In this video, we will look at some code to try to understand this a bit better.

The code compares using "await" with what happens if we were to use "Task" manually. The comparison helps my brain process the disconnect between return types. Hopefully it will help you as well.


Article:

More videos are on the way.

Happy Coding!

Monday, August 21, 2023

Why Do You Have to Return "Task" Whenever You "await" Something in a Method in C#?

There is something that has always bothered me in C#: Whenever you "await" something in a method, the return value must be wrapped in a Task.

Note: If you prefer video to text, take a look here: YouTube: Why do you have to return a Task when you use await in a C# method?

The Behavior

Let's start with an example:


    public Person GetPerson(int id)
    {
        List<Person> people = GetPeople();
        Person selectedPerson = people.Single(p => p.Id == id);
        return selectedPerson;
    }

    public async Task<Person> GetPersonAsync(int id)
    {
        List<Person> people = await GetPeopleAsync();
        Person selectedPerson = people.Single(p => p.Id == id);
        return selectedPerson;
    }

The first method (GetPerson) does not have any asynchronous code. It calls the "GetPeople" method that returns a List of Person objects. Then it uses a LINQ method (Single) to pick out an individual Person. Lastly it returns that person. As expected, the return type for the "GetPerson" method is "Person".

The second method (GetPersonAsync) does have asynchronous code. It calls the "GetPeopleAsync" method (which returns a Task<List<Person>>) and then awaits the result. This also gives us a List of Person objects. Then it uses the same LINQ method (Single) to get the selected Person. Lastly it returns that person.

But here's where things are a bit odd: even though our return statement ("return selectedPerson") returns a "Person" type, the method itself ("GetPeopleAsync") returns "Task<Person>".

A Leaky Abstraction

I love using "await" in my code. It is so much easier than dealing with the underlying Tasks directly, and it handles the 95% scenario for me (meaning, 95% of the time, it does what I need -- I only need to drop back to using Task directly when I need more flexibility).

I also really like how "await" lets me write asynchronous code very similarly to how I write non-asynchronous code. Exceptions are raised as expected, and I can use standard try/catch/finally blocks. For the most part, I do not have to worry about "Task" at all when using "await".

It is a very good abstraction over Task.

But it does "leak" in this one spot. A leaky abstraction is one where the underlying implementation shows through. And the return type of a method that uses "await" is one spot where the underlying "Task" implementation leaks through.

This isn't necessarily a bad thing. All abstractions leak to a certain extent. But this particular one has bugged me for quite a while. And it can be difficult to grasp for developers who may not have worked with Task directly.

A Better Understanding

Most of the descriptions I've seen have just said "If you use 'await' in your method, the return type is automatically wrapped in a Task." And there's not much more in the way of explanation.

To get a better understanding of why things work this way, let's get rid of the abstraction and look at using "Task" directly for this code, building things up one line at a time.

If you would like more information on how to use Task manually (along with how to use 'await'), you can take a look at the resources available here: I'll Get Back to You: Task, Await, and Asynchronous Methods in C#.

Step 1: The Method Signature

Let's start with the signature that we would like to have:


    public Person GetPersonAsyncWithTask(int id)
    {

    }

The idea is that I can pass the "id" of a person to this method and get a Person instance back. So this would be my ideal signature.

Step 2: Call the Async Method

Next, we'll call the "GetPersonAsync" method, but instead of awaiting it, we'll get the actual Task back. When I don't know the specific type that comes back from a method, I'll use "var result" to capture the value and then let Visual Studio help me. Here's that code:


    public Person GetPersonAsyncWithTask(int id)
    {
        var result = GetPeopleAsync();
    }

If we hover the cursor over "var", Visual Studio tells us the type we can expect:



This tells us that "result" is "Task<TResult>" and that "TResult is List<Person>". This means that "GetPeopleAsync" returns "Task<List<Person>>". So let's update our code to be more explict:


    public Person GetPersonAsyncWithTask(int id)
    {
        Task<List<Person>> peopleTask = GetPeopleAsync();
    }

Now we have our explicit type, along with a better name for the result: peopleTask.

Step 3: Add a Continuation

The next step is to add a continuation to the Task. By adding a continuation, we are telling our code that after the Task is complete, we would like to "continue" by doing something else.

This is done with the "ContinueWith" method on the task:


    public Person GetPersonAsyncWithTask(int id)
    {
        Task<List<Person>> peopleTask = GetPeopleAsync();
        peopleTask.ContinueWith(task =>
        {

        });        
    }

The "ContinueWith" method takes a delegate (commonly inlined using a lambda expression). In this case, the delegate takes the "peopleTask" as a parameter (which we'll call "task" in the continuation).

For more information on delegates and lambda expressions, you can take a look at the resources available here: Get Func-y: Understanding Delegates in C#.

Step 4: Fill in the Continuation Code

The next step is to fill in the body of our continuation. This is basically "the rest of the method" that we had in our non-asynchronous version:


    public Person GetPersonAsyncWithTask(int id)
    {
        Task<List<Person>> peopleTask = GetPeopleAsync();
        peopleTask.ContinueWith(task =>
        {
            List<Person> people = task.Result;
            Person selectedPerson = people.Single(p => p.Id == id);
            return selectedPerson;
        });        
    }

The first line in the continuation takes the "Result" property of the task (which is the List<Person>) and assigns it to a variable with a friendlier name: "people". Then we use the "Single" method like we did above to get an individual record from the list. Then we return that selected "Person" object.

But Now We Have a Problem

But now we have a problem: we want to return the "selectedPerson" from the "GetPersonAsyncWithTask" method, but it is being returned inside the continuation of the Task instead.

How do we get this out?

It turns out that "ContinueWith" returns a value. Let's use the same technique we used above to figure out what that is.

Step 5: Getting a Return from ContinueWith


    public Person GetPersonAsyncWithTask(int id)
    {
        Task<List<Person>> peopleTask = GetPeopleAsync();
        var result = peopleTask.ContinueWith(task =>
        {
            List<Person> people = task.Result;
            Person selectedPerson = people.Single(p => p.Id == id);
            return selectedPerson;
        });        
    }

Here we have added "var result = " in front of of the call to "peopleTask.ContinueWith". Then if we hover the mouse over "var", we see that this is "Task<TResult>" and "TResult is Person". So this tells us that "ContinueWith" returns a "Task<Person>".

So let's be more explicit with our variable:


    public Person GetPersonAsyncWithTask(int id)
    {
        Task<List<Person>> peopleTask = GetPeopleAsync();
        Task<Person> result = peopleTask.ContinueWith(task =>
        {
            List<Person> people = task.Result;
            Person selectedPerson = people.Single(p => p.Id == id);
            return selectedPerson;
        });        
    }

Now our "result" variable is specifically typed as "Task<Person>".

Step 6: Return the Result

The last step is to return the result:


    public Person GetPersonAsyncWithTask(int id)
    {
        Task<List<Person>> peopleTask = GetPeopleAsync();
        Task<Person> result = peopleTask.ContinueWith(task =>
        {
            List<Person> people = task.Result;
            Person selectedPerson = people.Single(p => p.Id == id);
            return selectedPerson;
        });
        return result;
    }

But we can't stop here. Our return types do not match. The method (GetPersonAsyncWithTask) returns a "Person", and the actual type we return is "Task<Person>".

So we need to update our method signature:


    public Task<Person> GetPersonAsyncWithTask(int id)
    {
        Task<List<Person>> peopleTask = GetPeopleAsync();
        Task<Person> result = peopleTask.ContinueWith(task =>
        {
            List<Person> people = task.Result;
            Person selectedPerson = people.Single(p => p.Id == id);
            return selectedPerson;
        });
        return result;
    }

Now the method returns a "Task<Person>". And this is what we want. If someone awaits what comes back from this method, they will get a "Person" object back. In addition, someone could take this task and set up their own continuation. This is just the nature of Task and how we set up asynchronous code using Task.

Back to the Comparison

So let's go back to our method comparison. But instead of comparing a method with "await" to a non-asynchronous method, let's compare a method with "await" to a method that handles the asynchronous Task manually.


    public async Task<Person> GetPersonAsync(int id)
    {
        List<Person> people = await GetPeopleAsync();
        Person selectedPerson = people.Single(p => p.Id == id);
        return selectedPerson;
    }

    public Task<Person> GetPersonAsyncWithTask(int id)
    {
        Task<List<Person>> peopleTask = GetPeopleAsync();
        Task<Person> result = peopleTask.ContinueWith(task =>
        {
            List<Person> people = task.Result;
            Person selectedPerson = people.Single(p => p.Id == id);
            return selectedPerson;
        });
        return result;
    }

Here's how we can think of this code now (and the compiler magic that happens behind it): anything after the "await" is automatically put into a continuation. What the compiler does is much more complicated, but this is how we can think of it from the programmer's perspective.

If we think about it this way, it's easier to see why the method returns "Task<Person>" instead of just "Person".

"await" is Pretty Magical

When we use "await" in our code, magical things happen behind the scenes. (And I'm saying that as a good thing.) The compiler gets to figure out all of the details of waiting for a Task to complete and what to do next. When I have this type of scenario (the 95% scenario), then it is pretty amazing. It saves me a lot of hassle and let's me work with code that looks more "normal".

The abstraction that "await" gives us is not perfect, though. It does "leak" a little bit. Whenever we "await" something in a method, the return value gets automatically wrapped in a Task. This means that we do need to change the return type of the method to Task. But it is a small price to pay for the ease of using "await".

Wrap Up

I've always had a bit of trouble with having to return a Task from any method that "awaits" something. I mean, I knew that I had to do it, but there was always a bit of cognitive dissonance with saying "the code in this method returns 'Person', but the method itself returns 'Task<Person>'."

By working through this manually, my brain is a little more comfortable. I have a better understanding of what is happening behind the "await", and that if we were to do everything manually, we would end up returning a Task. So it makes sense that after we add the magic of "await", the method will still need to return a Task.

I hope that this has helped you understand things a bit as well. If you have any questions or other ways of looking at this, feel free to leave a comment.

Happy Coding!