Friday, October 6, 2023

Producer/Consumer Exception Handling - A More Reliable Approach

In the last 2 articles, we looked at a couple of ways to deal with exceptions that can happen when using the Producer/Consumer pattern along with Channel<T> in C#. (Prior articles: "Don't Use 'Task.WhenAll' for Interdependent Tasks" and "Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels".)

Both of these approaches are short-circuiting, meaning that if the Consumer fails on one item, no further items are processed. The same is true of the Producer: if the Producer fails on one item, no further items are produced. This approach may work fine for particular applications. But what if we want something more robust?
By handling exceptions inside the Producer and Consumer, errors can be logged and processing can continue.
We can go back and look at the logged items to determine which items need to be redone. This is a more robust approach to exception handling for our sample application.

Motivation

This article is based on a question sent in by Edington Watt regarding a presentation that I did about Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed strange behavior when exceptions are thrown, and he asked the question "What are the best exception handling practices in the Consumer Producer approach using Channel<T>?"

Today we'll look at handling exceptions in a way that does not short-circuit the process.

Note: The full source code (including the original sample code and various approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.

Articles

Interdependent Tasks

We'll go back to the starting place we used in the previous article. Here are the relevant methods (available in the Program.cs file of the "refactored" project):


    static async Task Main(string[] args)
    {
        try
        {
            await ProducerConsumerWithExceptions();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

        Console.WriteLine("Done");
    }

    static async Task ProducerConsumerWithExceptions()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));

        Task producer = Producer(channel.Writer);
        Task consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

I won't go through this code again here. Check the prior article for a walkthrough. This code works fine for the success state, but it has strange behavior if the Consumer throws an exception.

Consumer Exception

If the Consumer throws an exception while processing an item, the entire application hangs:


Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11

Here's a reminder of what happens:

In the "ProducerConsumerWithExceptions" method, we create a Bounded Channel. This means that the capacity of the channel is limited to 10 items. When the channel has reached that capacity, no new items can be written to it until space is made available for them.

The Consumer reads one item off of the channel (index 0) and then throws an exception. The Consumer then stops, and no further items are read off of the channel.

The Producer produces data and puts it onto the channel. The first items produced (index 0) is pulled off of the channel by the Consumer. The Producer keeps going and puts 10 more items onto the channel (indexes 1 through 10).

Then the Producer produces index 11 and tries to put it onto the channel. At this point, the channel is at capacity (it has 10 items already), so the Producer waits (in an async-friendly way) for there to be space to write the next item.

And this is where the application hangs. The Producer will wait forever. Since "Task.WhenAll" is waiting for the Producer and Consumer to both finish, it will also wait forever. The application will not exit on its own.

No Handling of Exceptions in the Producer/Consumer

One cause of the behavior is that the Producer and Consumer do not handle their own exceptions. They let them bubble up and (hopefully) get caught in the Main method's try/catch block.

Consumer

Here is the code for the Consumer (in the "Program.cs" file in the "refactored" project):


    static async Task Consumer(ChannelReader<int> reader)
    {
        await foreach (var item in reader.ReadAllAsync())
        {
            Console.WriteLine($"Consuming object: {item}");
            MightThrowExceptionForConsumer(item);
        }
    }

This uses the "ReadAllAsync" method on the "ChannelWriter<T>" class to read the items off of the Channel as they become available.

The call to "MightThrowExceptionForConsumer" throws an exception. (Although it has "might" in the name, it is hard-coded to always throw an exception here.) And because the exception is unhandled, the "foreach" loop will exit.

This relies on the exception eventually making its way back to the "Main" method where it will be caught and logged in the catch block.

Producer

The Producer code is similar (in the same "Program.cs" file in the "refactored" project):


    static async Task Producer(ChannelWriter<int> writer)
    {
        try
        {
            for (int i = 0; i < 20; i++)
            {
                Console.WriteLine($"Producing something: {i}");
                MightThrowExceptionForProducer(i);
                await Task.Delay(10);
                await writer.WriteAsync(i);
            }
        }
        finally
        {
            writer.Complete();
        }
    }

This uses a "for" loop to generate 20 items and write them to the Channel using "WriteAsync". This could also throw an exception in the "MightThrowExceptionForProducer" method (although for this sample, it is hard-coded to not throw an exception).

If an exception is thrown in the "for" loop, the loop will exit before finishing. And although we do not have a "catch" block to catch an exception, we do have a "finally" block.

In the "finally" block we mark the ChannelWriter as "Complete". This lets the Consumer know that no new items will be added to the Channel, and the Consumer can stop waiting for new items. This is in a "finally" block because even if we get an exception, we want to make sure that the ChannelWriter is marked "Complete".

But as with the Consumer, the Producer relies on the exception bubbling up to the "Main" method where it should be logged in the "catch" block.

Adding Exception Handling to the Producer and Consumer

A more reliable way of dealing with exceptions is to handle them inside the Producer and the Consumer. This also has the effect of removing the dependency between the Producer and Consumer that we saw earlier. With the dependency removed, we can use a Bounded Channel if we need to, and we can also use "Task.WhenAll" to wait for the Producer and Consumer tasks to complete.

The code for this section is available in the "doesnt-stop" project.

Consumer

Here is the code for the updated Consumer (in the "Program.cs" file in the "doesnt-stop" project):

    static async Task Consumer(ChannelReader<int> reader)
    {
        await foreach (var item in reader.ReadAllAsync())
        {
            try
            {
                Console.WriteLine($"Consuming object: {item}");
                MightThrowExceptionForConsumer(item);
                TotalConsumed++;
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Logged: {ex.Message}");
            }
        }
    }

Here we have added a try/catch block inside of the foreach loop. This means that if an exception is thrown, it will be caught and logged here, and the loop will continue to the next item.

The Consumer is no longer short-circuiting. So we do no longer need to worry about the Consumer stopping before it has read all of the items from the Channel.

One other new item is "TotalConsumed++". This is a static variable that is updated with the total number of items successfully processed (it is after the "MightThrow..." method). This lets us track how many items were successful in the Consumer.

Producer

We have something very similar in the Producer (in the "Program.cs" file in the "doesnt-stop" project):

    static async Task Producer(ChannelWriter<int> writer)
    {
        for (int i = 0; i < 100; i++)
        {
            try
            {
                Console.WriteLine($"Producing something: {i}");
                MightThrowExceptionForProducer(i);
                await Task.Delay(10);
                TotalProduced++;
                await writer.WriteAsync(i);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Logged: {ex.Message}");
            }
        }

        writer.Complete();
    }

Inside the "for" loop, we have a try/catch block. So if an exception is thrown, it is caught and logged, and the loop processing will continue.

Marking the ChannelWriter "Complete" no longer needs to be in a "finally" block. We will not have any unhandled exceptions escape here (and if we did get one, it would probably be an unrecoverable one).

One other new item is "TotalProduced++". This is a static variable that is updated with the total number of items successfully produced (it is after the "MightThrow..." method). This lets us track how many items were successful in the Producer.

Throwing Exceptions

To make the output more interesting, the methods that might throw exceptions use a random number generator to see if an exception should be thrown (in the "Program.cs" file in the "doesnt-stop" project):

    private static void MightThrowExceptionForProducer(int item)
    {
        if (Randomizer.Next() % 3 == 0)
            throw new Exception($"Bad thing happened in Producer ({item})");
    }

    private static void MightThrowExceptionForConsumer(int item)
    {
        if (Randomizer.Next() % 50 == 0)
            throw new Exception($"Bad thing happened in Consumer ({item})");
    }
The Producer throws an exception 1 out of 3 times (approximately). The Consumer throws an exception 1 out of 50 times (approximately).

Program Output

So lets see how this program behaves. We only modified the Producer and Consumer code. The original "Main" and "ProducerConsumerWithExceptions" are unchanged. To give us more of a chance to get random exceptions, I also changed the code so that it produces 100 items (instead of 20).

Consuming object: 90
Logged: Bad thing happened in Producer (91)
Producing something: 92
Producing something: 93
Consuming object: 92
Producing something: 94
Consuming object: 93
Logged: Bad thing happened in Producer (94)
Producing something: 95
Producing something: 96
Consuming object: 95
Producing something: 97
Consuming object: 96
Producing something: 98
Consuming object: 97
Logged: Bad thing happened in Consumer (97)
Producing something: 99
Consuming object: 98
Consuming object: 99
Total Produced: 69
Total Consumed: 66
Done
This is just the tail end of the output. It shows several logged items (2 Producer errors and 1 Consumer error).

Notice that the Total Produced (69) and Total Consumed (66) do not match. This is because the Producer only produced 69 items (of the 100), so the Consumer had a chance to process at most 69 items. The Consumer also generated a few errors (3), so it is showing 3 fewer items.

"Done" is printed out at the bottom. This lets us know that the application ran to completion and exited.

A More Reliable Approach

Our application is more reliable when the Producer and Consumer can deal with their own exceptions. This is not always possible, but it is a good approach for the sample code that we have here.

But we always need to consider what our particular program is doing. I had a great question come up in a recent workshop. The developer asked, 
"If we get an error in the Consumer, can we just put the item back onto the Channel for it to try again?"
If the application has random hiccups that can be solved by re-processing, then maybe this is a good solution. But there is a problem: what if it isn't a random hiccup?

If the Consumer fails every time it tries to process a particular record, then putting it back on the Channel will only create an endless loop of failures.

One approach that we came up with was to have a separate Channel for errors. So if the Consumer failed to process an item, it would put it on the error channel. This would be processed in a different workflow. For example, the Consumer of the error channel could try to process the item again and then log an error if it was unsuccessful. This would give the application a chance to try again (to handle the random hiccups) and also to log errors for items that could not be processed at all.

Wrap Up

As with most code, there are a lot of options. When I'm writing code, I usually spend a lot more time thinking than typing. Working through different scenarios lets me come up with a solution at a high level and figure out how all of the pieces fit together. Typing the code is the easy part.

Happy Coding!

No comments:

Post a Comment