Tuesday, February 9, 2021

What's the Difference between Channel and ConcurrentQueue in C#?

In response to the previous article (Introduction to Channels in C#), I've received several questions from folks who have been using ConcurrentQueue<T> (or another type from Collections.Concurrent). These questions boil down to: What's the difference between Channel<T> and ConcurrentQueue<T>?
Short answer: Specialization and optimization.
If you're interested in the long answer, keep reading.

Specialization: Separate Reader and Writer Properties

In the last article, we saw that Channel<T> has separate Reader and Writer properties (which are ChannelReader<T> and ChannelWriter<T>, respectively). But we didn't really take advantage of that in the introductory demo.

Since Channel<T> has separate properties for reading and writing, we can create functions that can only read from a channel or only write to a channel.

As an example, we'll look at some code from a na├»ve machine learning project of mine: https://github.com/jeremybytes/digit-display. This recognizes hand-written digits from bitmaps. The process is a bit processor intensive (some operations take about 1/3 of a second). The code uses concurrent operations to take advantage of all of the cores of my CPU. This greatly speeds things up when processing hundreds of digits.

Example: Separate Writer
We'll start with a function that produces results from the input. This is taken from the "digit-console-channel" project in the repository mentioned above. I won't include line numbers in my references because that code is currently in flux.

Here is the "Produce" function (from the Program.cs file in the digit-console-channel project):

C#
    private static async Task Produce(ChannelWriter<Prediction> writer,
    string[] rawData, FSharpFunc<int[], Observation> classifier)
    {
      await Task.Run(() =>
      {
        Parallel.ForEach(rawData, imageString =>
        {
          // some bits skipped here.

          var result = Recognizers.predict(ints, classifier);

          var prediction = new Prediction 
          {
            // some bits skipped here, too.
          };

          writer.WriteAsync(prediction);
        });
      });

      writer.Complete();
    }

I won't go through all of the details of this code (that's for another time), but I will point out a couple of things.

First, the function takes "ChannelWriter<Prediction>" as a parameter. "Prediction" is a custom type that contains things like the predicted value, the actual value, and the original image.

The important thing about the parameter is that it indicates that this function will only write to the channel. It will not read from the channel. It does not have access to read from the channel.

Here is how this method is called in the code (from the Program.cs file in the digit-console-channel project):

C#
    var channel = Channel.CreateUnbounded<Prediction>();

    var listener = Listen(channel.Reader, log);
    var producer = Produce(channel.Writer, rawValidation, classifier);

    await producer;
    await listener;

In the above code, the first line creates an unbounded channel. Then in the third line, we call the "Produce" function and pass in "channel.Writer". Instead of passing in the entire channel, we pass in only the part that we need -- the "write" functionality.

Example: Separate Reader
Just like having a function that takes a ChannelWriter as a parameter, we have a function that takes a ChannelReader as a parameter.

Here is the "Listen" function (from the Program.cs file in the digit-console-channel project):

C#
    private static async Task Listen(ChannelReader<Prediction> reader,
    List<Prediction> log)
    {
      await foreach (Prediction prediction in reader.ReadAllAsync())
      {
        // Display the result
        Console.SetCursorPosition(0, 0);
        WriteOutput(prediction);

        // logging bits have been skipped
      }
    }

This function takes a "ChannelReader<Prediction>" as a parameter. This indicates that this function will only read from the channel. It does not have the ability to write to the channel.

To repeat the code from above, here is where the "Listen" function is called:

C#
    var channel = Channel.CreateUnbounded<Prediction>();

    var listener = Listen(channel.Reader, log);
    var producer = Produce(channel.Writer, rawValidation, classifier);

    await producer;
    await listener;

The call to "Listen" uses the "channel.Reader" property. Rather than passing in the entire channel, we just use the functionality that we need -- the ability to read from the channel.

Specialization: Closing the Channel

Another specialization that Channel<T> has is the ability to close the channel. I should be a bit more specific here: the terminology of "closing the channel" comes from my work with channels in Go (golang). In Go, closing a channel indicates that no new items will be written to a channel (writing to a closed channel results in a "panic" -- equivalent to an unhandled exception). Items can still be read from a closed channel until the channel is empty.

If you're curious about channels in Go, see "Go (golang) Channels - Moving Data Between Concurrent Processes".

In C#, we don't "close the channel", we mark the "Writer" as complete -- meaning, there will be no new items written to the channel. We can still read from the channel until all of the items have been read.

Marking the Writer as Complete
The ChannelWriter<T> class has a "Complete" method. This is included in the "Produce" function that we saw earlier.

C#
    private static async Task Produce(ChannelWriter<Prediction> writer,
    string[] rawData, FSharpFunc<int[], Observation> classifier)
    {
      await Task.Run(() =>
      {
        Parallel.ForEach(rawData, imageString =>
        {
          // some bits skipped here.

          var result = Recognizers.predict(ints, classifier);

          var prediction = new Prediction 
          {
            // some bits skipped here, too.
          };

          writer.WriteAsync(prediction);
        });
      });

      writer.Complete();
    }

The last line of this function calls "Writer.Complete()". This happens outside of the "Parallel.ForEach" loop -- after all of the items have been processed and we are done writing to the channel.

Signaling the Reader
The important aspect of marking the Writer as complete is that it lets the Reader know that there will be no new items.

Let's look at the "Listen" function again:

C#
    private static async Task Listen(ChannelReader<Prediction> reader,
    List<Prediction> log)
    {
      await foreach (Prediction prediction in reader.ReadAllAsync())
      {
        // Display the result
        Console.SetCursorPosition(0, 0);
        WriteOutput(prediction);

        // logging bits have been skipped
      }
    }

The core of this function is a "foreach" loop that iterates using the result from "reader.ReadAllAsync()". As we saw in the previous article, this is an IAsyncEnumerable<T>.

At some point, we need to exit the "foreach" loop. That's why "Complete()" is important. Once the channel writer indicates that there will be no new items, the "foreach" exits after all of the remaining items have been read. (There are more technical bits to how IAsyncEnumerable works, but that's not important to understand this bit of code.)

Optimization: Channel Options

Another difference between Channel<T> and ConcurrentQueue<T> is that Channel<T> has some features to tune the operation.

The "Channel.CreateBounded<T>" and "Channel.CreateUnbounded<T>" functions take an optional parameter ("BoundedChannelOptions" or "UnboundedChannelOptions"). These options give the compiler a chance to optimize the operations.

Here's a link to the Microsoft documentation for "BoundedChannelOptions": BoundedChannelOptions Class. This gives a bit more detail on what is available.

Let's look at a few of these options.

Option: SingleWriter
The boolean SingleWriter property lets us specify whether we have a single process writing to the channel or if we have multiple processes (the default is "false", meaning multiple).

If we have multiple writers, there is more overhead to make sure that the capacity of a bounded channel is not exceeded and that writers using "WaitToWriteAsync" or "TryWriteAsync" are managed appropriately.

If we only have a SingleWriter, then the compiler can optimize the code since it does not need to worry about some of those complexities.

For our example code, we need to allow multiple writers. The ParallelForeach loop (in the "Produce" function) spins up concurrent operations for each item in the loop. The result is that we have multiple processes writing to the channel.

Option: SingleReader
Like SingleWriter, the boolean SingleReader property lets us specify whether we have a single process reading from the channel or if we have multiple processes (the default is "false", meaning multiple).

With multiple readers, there is more overhead when readers are using "WaitToReadAsync" or "TryReadAsync".

If we only have a SingleReader, then the compiler can optimize the code.

For our example code, we only have 1 reader (in the "Listen" function). We could set this option to "true" for our code.

Option: AllowSynchronousContinuations
Another option is "AllowSynchronousContinuations". I won't pretend to understand the specifics behind this option.

Setting this option to true can provide measurable throughput improvements by avoiding scheduling additional work items. However, it may come at the cost of reduced parallelism, as for example a producer may then be the one to execute work associated with a consumer, and if not done thoughtfully, this can lead to unexpected interactions. The default is false.
I'm not exactly sure what scenarios this option is useful for. But it looks like we can get increased throughput at the cost of decreased parallelism.

The main point is that Channel<T> has some tuning that is not available in concurrent collections.

Channel<T> is not an Enumerable

Another thing to be aware of when looking at the differences between Channel<T> and ConcurrentQueue<T> (or other concurrent collections) is that Channel<T> is not an IEnumerable<T>. It also does not implement any other collection interfaces (such as ICollection<T>, IReadOnlyCollection<T>, or IList<T>).

IEnumerable<T> is important for 2 reason.

First, this is important because of LINQ (Language Integrated Query). LINQ provides a set of extension methods on IEnumerable<T> (and IQueryable<T> and some other types) that let us filter, sort, aggregate, group, and transform data in collections.

ConcurrentQueue<T> does implement IEnumerable<T>, so it has all of these extension methods available.

Channel<T> does not implement IEnumerable<T> (the same is true of ChannelReader<T>), so these extension methods are not available.

Second, IEnumerable<T> is important because it allows us to "foreach" through a collection.

ConcurrentQueue<T> does support "foreach".

Channel<T> does not directly support "foreach", but we have seen that ChannelReader<T> has a "ReadAllAsync" method that returns IAsyncEnumerable<T>. So this allows us to use "await foreach" on a channel reader, but it's less direct than using a concurrent collection.

There is no Peek

A functional difference between Channel<T> and ConcurrentQueue<T> is that Channel<T> does not have a "Peek" function. In a queue, "Peek" allows us to look at the next item in the queue without actually removing it from the queue.

In a Channel (more specifically ChannelReader<T>), there is no "Peek" functionality. When we read an item from the channel, it also removes it from the channel.

Which to Use?

So the final question is "Should I use Channel<T> instead of ConcurrentQueue<T>?"

Personally, I like the specialization of Channel<T>, since this is what I generally need -- the ability to produce items in one concurrent operation (or set of operations) and read the output in another concurrent operation.

I like how I can be explicit about the processes that are writing to the channel as opposed to the processes that are reading from the channel. I also like the ability to mark the Writer as "Complete" so that I can stop reading. (This is doable with a ConcurrentQueue, but we would need to write our own signaling code for that.)

If ConcurrentQueue<T> is working for you, there is no reason to rip it out and change it to Channel<T> unless there are specific features (such as the optimizations) that you need. But for new code, I would recommend considering if Channel<T> will work for you.

I'm still exploring Channel<T> and figuring out how I can adjust my concurrent/parallel patterns to take advantage of it. Be sure to check back. I'll have more articles on what I've learned.

Happy Coding!

6 comments:

  1. Thanks for the follow up article Jeremy.

    ReplyDelete
  2. If you need peek functionality, you should look at System.IO.Pipes; it is similar to channels but allows for a much more nuanced reading of the contents.

    ReplyDelete
  3. In your example with the Parallel.ForEach, the WriteAsync is not awaited. This raises a question : Why use Task.Run if you await nothing and let the Parallel do its thing ? Am I missing something ?

    ReplyDelete
    Replies
    1. This code is still a work in progress. The Parallel.ForEach ends up starving the listener that writes the output to the console, so the output is delayed instead of running concurrently with the other processes. The GitHub repo also has a WPF version of this application that works fine (apparently WPF is really good at protecting the main/UI thread). I put the Parallel.Foreach in a Task because it would normally block until the loop is complete. I need the UI to continue processing, and that works in the WPF app. In the console app, I currently have a "MaxDegreesOfParallelism" opiton set to prevent the starvation. I don't like this solution because it is based on my CPU (I set it to 7 on my 8 core machine, and it works fine. On an older machine (4 core), I need to set it to 3 for it to work.) Adding an "await" to the WriteAsync does not make a difference for this particular issue. I'm still experimenting to find a good solution :)

      Delete
  4. To producer and listener to run concurrently should you use await Task.WhenAll(new [] { producer, listener }) instead?

    ReplyDelete
    Replies
    1. I debated whether to get into those details in this article, and I decided against it. Here's the flow: (1) I kick off the Listen process so it will start pulling items off of the channel as soon as they become available. (2) I kick off the Produce process to start putting items onto the channel. The items will be pulled off by the already running listener. (3) I await the producer because I don't want to continue until after all of the items are processed. (4) I await the listener because I don't want to continue until after the channel is drained.

      Using "await Task.WhenAll" would work fine here. I kind of like the separate awaits because it shows that I expect the producer to finish before the listener, but you should use whatever is most readable to you and your team.

      Delete