In the last 2 articles, we looked at a couple of ways to deal with exceptions
that can happen when using the Producer/Consumer pattern along with
Channel<T> in C#. (Prior articles: "Don't Use 'Task.WhenAll' for Interdependent Tasks" and "Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded
Channels".)
Both of these approaches are short-circuiting, meaning that if the Consumer
fails on one item, no further items are processed. The same is true of the
Producer: if the Producer fails on one item, no further items are produced.
This approach may work fine for particular applications. But what if we want
something more robust?
By handling exceptions inside the Producer and Consumer, errors can be logged and processing can continue.
We can go back and look at the logged items to determine which items need to
be redone. This is a more robust approach to exception handling for our sample
application.
Motivation
This article is based on a question sent in by Edington
Watt regarding a presentation that I did about
Channel<T> in C#. He and the WTW ICT Technology team wrote a sample project that showed
strange behavior when exceptions are thrown, and he asked the question "What
are the best exception handling practices in the Consumer Producer approach
using Channel<T>?"
Today we'll look at handling exceptions in a way that does not short-circuit
the process.
Note: The full source code (including the original sample code and various
approaches to fixing the strange behavior is available here: https://github.com/jeremybytes/channel-exceptions.
Articles
- Don't Use "Task.WhenAll" for Interdependent Tasks
- Looking at Producer/Consumer Dependencies: Bounded vs. Unbounded Channels
- Producer/Consumer Exception Handling - A More Reliable Approach (this one)
Interdependent Tasks
We'll go back to the starting place we used in the previous article. Here are
the relevant methods (available in the
Program.cs file of the "refactored" project):
static async Task Main(string[] args)
{
try
{
await ProducerConsumerWithExceptions();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine("Done");
}
static async Task ProducerConsumerWithExceptions()
{
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));
Task producer = Producer(channel.Writer);
Task consumer = Consumer(channel.Reader);
await Task.WhenAll(producer, consumer);
}
I won't go through this code again here. Check the
prior article for a walkthrough. This code works fine for the success state, but it has strange behavior if
the Consumer throws an exception.
Consumer Exception
If the Consumer throws an exception while processing an item, the entire
application hangs:
Producing something: 0
Producing something: 1
Consuming object: 0
Producing something: 2
Producing something: 3
Producing something: 4
Producing something: 5
Producing something: 6
Producing something: 7
Producing something: 8
Producing something: 9
Producing something: 10
Producing something: 11
Here's a reminder of what happens:
In the "ProducerConsumerWithExceptions" method, we create a Bounded Channel.
This means that the capacity of the channel is limited to 10 items. When the
channel has reached that capacity, no new items can be written to it until
space is made available for them.
The Consumer reads one item off of the channel (index 0) and then throws an
exception. The Consumer then stops, and no further items are read off of the
channel.
The Producer produces data and puts it onto the channel. The first items
produced (index 0) is pulled off of the channel by the Consumer. The Producer
keeps going and puts 10 more items onto the channel (indexes 1 through 10).
Then the Producer produces index 11 and tries to put it onto the channel. At
this point, the channel is at capacity (it has 10 items already), so the
Producer waits (in an async-friendly way) for there to be space to write the
next item.
And this is where the application hangs. The Producer will wait forever. Since
"Task.WhenAll" is waiting for the Producer and Consumer to both finish, it
will also wait forever. The application will not exit on its own.
No Handling of Exceptions in the Producer/Consumer
One cause of the behavior is that the Producer and Consumer do not handle
their own exceptions. They let them bubble up and (hopefully) get caught in
the Main method's try/catch block.
Consumer
Here is the code for the Consumer (in the
"Program.cs" file in the "refactored" project):
static async Task Consumer(ChannelReader<int> reader)
{
await foreach (var item in reader.ReadAllAsync())
{
Console.WriteLine($"Consuming object: {item}");
MightThrowExceptionForConsumer(item);
}
}
This uses the "ReadAllAsync" method on the "ChannelWriter<T>" class to
read the items off of the Channel as they become available.
The call to "MightThrowExceptionForConsumer" throws an exception. (Although it
has "might" in the name, it is hard-coded to always throw an exception here.)
And because the exception is unhandled, the "foreach" loop will exit.
This relies on the exception eventually making its way back to the "Main"
method where it will be caught and logged in the catch block.
Producer
The Producer code is similar (in the same "Program.cs" file in the "refactored" project):
static async Task Producer(ChannelWriter<int> writer)
{
try
{
for (int i = 0; i < 20; i++)
{
Console.WriteLine($"Producing something: {i}");
MightThrowExceptionForProducer(i);
await Task.Delay(10);
await writer.WriteAsync(i);
}
}
finally
{
writer.Complete();
}
}
This uses a "for" loop to generate 20 items and write them to the Channel
using "WriteAsync". This could also throw an exception in the
"MightThrowExceptionForProducer" method (although for this sample, it is
hard-coded to not throw an exception).
If an exception is thrown in the "for" loop, the loop will exit before
finishing. And although we do not have a "catch" block to catch an exception,
we do have a "finally" block.
In the "finally" block we mark the ChannelWriter as "Complete". This lets the
Consumer know that no new items will be added to the Channel, and the Consumer
can stop waiting for new items. This is in a "finally" block because even if
we get an exception, we want to make sure that the ChannelWriter is marked
"Complete".
But as with the Consumer, the Producer relies on the exception bubbling up to
the "Main" method where it should be logged in the "catch" block.
Adding Exception Handling to the Producer and Consumer
A more reliable way of dealing with exceptions is to handle them inside the
Producer and the Consumer. This also has the effect of removing the dependency between the Producer and Consumer that we saw earlier. With the dependency removed, we can use a Bounded Channel if we need to, and we can also use "Task.WhenAll" to wait for the Producer and Consumer tasks to complete.
The code for this section is available in the "doesnt-stop"
project.
Consumer
Here is the code for the updated Consumer (in the
"Program.cs" file in the "doesnt-stop" project):
static async Task Consumer(ChannelReader<int> reader)
{
await foreach (var item in reader.ReadAllAsync())
{
try
{
Console.WriteLine($"Consuming object: {item}");
MightThrowExceptionForConsumer(item);
TotalConsumed++;
}
catch (Exception ex)
{
Console.WriteLine($"Logged: {ex.Message}");
}
}
}
Here we have added a try/catch block inside of the foreach loop.
This means that if an exception is thrown, it will be caught and logged here,
and the loop will continue to the next item.
The Consumer is no longer short-circuiting. So we do no longer need to worry
about the Consumer stopping before it has read all of the items from the
Channel.
One other new item is "TotalConsumed++". This is a static variable that is
updated with the total number of items successfully processed (it is after the
"MightThrow..." method). This lets us track how many items were successful in
the Consumer.
Producer
We have something very similar in the Producer (in the "Program.cs" file in the "doesnt-stop" project):
static async Task Producer(ChannelWriter<int> writer)
{
for (int i = 0; i < 100; i++)
{
try
{
Console.WriteLine($"Producing something: {i}");
MightThrowExceptionForProducer(i);
await Task.Delay(10);
TotalProduced++;
await writer.WriteAsync(i);
}
catch (Exception ex)
{
Console.WriteLine($"Logged: {ex.Message}");
}
}
writer.Complete();
}
Inside the "for" loop, we have a try/catch block. So if an exception is
thrown, it is caught and logged, and the loop processing will continue.
Marking the ChannelWriter "Complete" no longer needs to be in a "finally"
block. We will not have any unhandled exceptions escape here (and if we did
get one, it would probably be an unrecoverable one).
One other new item is "TotalProduced++". This is a static variable that
is updated with the total number of items successfully produced (it is after
the "MightThrow..." method). This lets us track how many items were
successful in the Producer.
Throwing Exceptions
To make the output more interesting, the methods that might throw exceptions
use a random number generator to see if an exception should be thrown (in
the "Program.cs" file in the "doesnt-stop" project):
private static void MightThrowExceptionForProducer(int item)
{
if (Randomizer.Next() % 3 == 0)
throw new Exception($"Bad thing happened in Producer ({item})");
}
private static void MightThrowExceptionForConsumer(int item)
{
if (Randomizer.Next() % 50 == 0)
throw new Exception($"Bad thing happened in Consumer ({item})");
}
The Producer throws an exception 1 out of 3 times (approximately). The
Consumer throws an exception 1 out of 50 times (approximately).
Program Output
So lets see how this program behaves. We only modified the Producer and
Consumer code. The original "Main" and "ProducerConsumerWithExceptions" are
unchanged. To give us more of a chance to get random exceptions, I also
changed the code so that it produces 100 items (instead of 20).
Consuming object: 90
Logged: Bad thing happened in Producer (91)
Producing something: 92
Producing something: 93
Consuming object: 92
Producing something: 94
Consuming object: 93
Logged: Bad thing happened in Producer (94)
Producing something: 95
Producing something: 96
Consuming object: 95
Producing something: 97
Consuming object: 96
Producing something: 98
Consuming object: 97
Logged: Bad thing happened in Consumer (97)
Producing something: 99
Consuming object: 98
Consuming object: 99
Total Produced: 69
Total Consumed: 66
Done
This is just the tail end of the output. It shows several logged items (2 Producer errors and 1 Consumer error).
Notice that the Total Produced (69) and Total Consumed (66) do not match. This is because the Producer only produced 69 items (of the 100), so the Consumer had a chance to process at most 69 items. The Consumer also generated a few errors (3), so it is showing 3 fewer items.
"Done" is printed out at the bottom. This lets us know that the application ran to completion and exited.
A More Reliable Approach
Our application is more reliable when the Producer and Consumer can deal with their own exceptions. This is not always possible, but it is a good approach for the sample code that we have here.
But we always need to consider what our particular program is doing. I had a great question come up in a recent workshop. The developer asked,
"If we get an error in the Consumer, can we just put the item back onto the Channel for it to try again?"
If the application has random hiccups that can be solved by re-processing, then maybe this is a good solution. But there is a problem: what if it isn't a random hiccup?
If the Consumer fails every time it tries to process a particular record, then putting it back on the Channel will only create an endless loop of failures.
One approach that we came up with was to have a separate Channel for errors. So if the Consumer failed to process an item, it would put it on the error channel. This would be processed in a different workflow. For example, the Consumer of the error channel could try to process the item again and then log an error if it was unsuccessful. This would give the application a chance to try again (to handle the random hiccups) and also to log errors for items that could not be processed at all.
Wrap Up
As with most code, there are a lot of options. When I'm writing code, I usually spend a lot more time thinking than typing. Working through different scenarios lets me come up with a solution at a high level and figure out how all of the pieces fit together. Typing the code is the easy part.
Happy Coding!