In response to the previous article (Introduction to Channels in C#), I've received several questions from folks who have been using
  ConcurrentQueue<T> (or another type from Collections.Concurrent). These
  questions boil down to: What's the difference between Channel<T> and
  ConcurrentQueue<T>?
Short answer: Specialization and optimization.
If you're interested in the long answer, keep reading.
Specialization: Separate Reader and Writer Properties
  In
  the last article, we saw that Channel<T> has separate Reader and Writer
  properties (which are ChannelReader<T> and ChannelWriter<T>,
  respectively). But we didn't really take advantage of that in the introductory
  demo.
  Since Channel<T> has separate properties for reading and writing, we can
  create functions that can only read from a channel or
  only write to a channel.
  As an example, we'll look at some code from a naïve machine learning project of
  mine: https://github.com/jeremybytes/digit-display. This recognizes hand-written digits from bitmaps. The process is a bit
  processor intensive (some operations take about 1/3 of a second). The code
  uses concurrent operations to take advantage of all of the cores of my CPU.
  This greatly speeds things up when processing hundreds of digits.
Example: Separate Writer
We'll start with a function that produces results from the input. This is taken from the
  "digit-console-channel" project in the repository mentioned above. I won't
  include line numbers in my references because that code is currently in flux.
  Here is the "Produce" function (from the
  Program.cs file
  in the digit-console-channel project):
C#
        private static async Task
    Produce(ChannelWriter<Prediction> writer,
  
  
        string[] rawData, FSharpFunc<int[], Observation>
    classifier)
  
      {
        await Task.Run(() =>
        {
  
            Parallel.ForEach(rawData, imageString =>
  
          {
            // some bits skipped here.
  
              var result = Recognizers.predict(ints,
    classifier);
  
  
              var prediction = new Prediction 
  
            {
  
                // some bits skipped here, too.
  
            };
            writer.WriteAsync(prediction);
          });
        });
        writer.Complete();
      }
  I won't go through all of the details of this code (that's for another time),
  but I will point out a couple of things.
  First, the function takes "ChannelWriter<Prediction>" as a parameter.
  "Prediction" is a custom type that contains things like the predicted value,
  the actual value, and the original image.
  The important thing about the parameter is that it indicates that this
  function will only write to the channel. It will not read from the
  channel. It does not have access to read from the channel.
  Here is how this method is called in the code (from the
  Program.cs file
  in the digit-console-channel project):
C#
        var channel = Channel.CreateUnbounded<Prediction>();
  
      var listener = Listen(channel.Reader, log);
  
        var producer = Produce(channel.Writer, rawValidation,
    classifier);
  
      await producer;
      await listener;
  In the above code, the first line creates an unbounded channel. Then in the
  third line, we call the "Produce" function and pass in "channel.Writer".
  Instead of passing in the entire channel, we pass in only the part that we
  need -- the "write" functionality.
Example: Separate Reader
  Just like having a function that takes a ChannelWriter as a parameter, we have
  a function that takes a ChannelReader as a parameter.
  Here is the "Listen" function (from the
  Program.cs file
  in the digit-console-channel project):
C#
        private static async Task
    Listen(ChannelReader<Prediction> reader,
  
      List<Prediction> log)
      {
  
          await foreach (Prediction prediction in
    reader.ReadAllAsync())
  
        {
          // Display the result
          Console.SetCursorPosition(0, 0);
          WriteOutput(prediction);
          // logging bits have been skipped
        }
      }
  This function takes a "ChannelReader<Prediction>" as a parameter. This
  indicates that this function will only read from the channel. It
  does not have the ability to write to the channel.
  To repeat the code from above, here is where the "Listen" function is called:
C#
        var channel = Channel.CreateUnbounded<Prediction>();
  
      var listener = Listen(channel.Reader, log);
  
        var producer = Produce(channel.Writer, rawValidation,
    classifier);
  
      await producer;
      await listener;
  The call to "Listen" uses the "channel.Reader" property. Rather than passing
  in the entire channel, we just use the functionality that we need -- the
  ability to read from the channel.
Specialization: Closing the Channel
  Another specialization that Channel<T> has is the ability to close the
  channel. I should be a bit more specific here: the terminology of "closing the
  channel" comes from my work with channels in Go (golang). In Go, closing a
  channel indicates that no new items will be written to a channel (writing to a
  closed channel results in a "panic" -- equivalent to an unhandled exception). Items can still be read from a closed channel until the channel is empty.
If you're curious about channels in Go, see "Go (golang) Channels - Moving Data Between Concurrent Processes".
  In C#, we don't "close the channel", we mark the "Writer" as complete --
  meaning, there will be no new items written to the channel. We can still read
  from the channel until all of the items have been read.
Marking the Writer as Complete
  The ChannelWriter<T> class has a "Complete" method. This is included in
  the "Produce" function that we saw earlier.
C#
        private static async Task
    Produce(ChannelWriter<Prediction> writer,
  
  
        string[] rawData, FSharpFunc<int[], Observation>
    classifier)
  
      {
        await Task.Run(() =>
        {
  
            Parallel.ForEach(rawData, imageString =>
  
          {
            // some bits skipped here.
  
              var result = Recognizers.predict(ints,
    classifier);
  
  
              var prediction = new Prediction 
  
            {
  
                // some bits skipped here, too.
  
            };
            writer.WriteAsync(prediction);
          });
        });
        writer.Complete();
      }
  The last line of this function calls "Writer.Complete()". This happens outside
  of the "Parallel.ForEach" loop -- after all of the items have been processed and we are done writing to the channel.
Signaling the Reader
  The important aspect of marking the Writer as complete is that it lets the
  Reader know that there will be no new items.
Let's look at the "Listen" function again:
C#
        private static async Task
    Listen(ChannelReader<Prediction> reader,
  
      List<Prediction> log)
      {
  
          await foreach (Prediction prediction in
    reader.ReadAllAsync())
  
        {
          // Display the result
          Console.SetCursorPosition(0, 0);
          WriteOutput(prediction);
          // logging bits have been skipped
        }
      }
The core of this function is a "foreach" loop that iterates using the result from "reader.ReadAllAsync()". As we saw in the previous article, this is an IAsyncEnumerable<T>.
At some point, we need to exit the "foreach" loop. That's why "Complete()" is important. Once the channel writer indicates that there will be no new items, the "foreach" exits after all of the remaining items have been read. (There are more technical bits to how IAsyncEnumerable works, but that's not important to understand this bit of code.)
Optimization: Channel Options
Another difference between Channel<T> and ConcurrentQueue<T> is that Channel<T> has some features to tune the operation.
The "Channel.CreateBounded<T>" and "Channel.CreateUnbounded<T>" functions take an optional parameter ("BoundedChannelOptions" or "UnboundedChannelOptions"). These options give the compiler a chance to optimize the operations.
Here's a link to the Microsoft documentation for "BoundedChannelOptions": BoundedChannelOptions Class. This gives a bit more detail on what is available.
Let's look at a few of these options.
Option: SingleWriter
The boolean SingleWriter property lets us specify whether we have a single process writing to the channel or if we have multiple processes (the default is "false", meaning multiple).
If we have multiple writers, there is more overhead to make sure that the capacity of a bounded channel is not exceeded and that writers using "WaitToWriteAsync" or "TryWriteAsync" are managed appropriately.
If we only have a SingleWriter, then the compiler can optimize the code since it does not need to worry about some of those complexities.
For our example code, we need to allow multiple writers. The ParallelForeach loop (in the "Produce" function) spins up concurrent operations for each item in the loop. The result is that we have multiple processes writing to the channel.
Option: SingleReader
Like SingleWriter, the boolean SingleReader property lets us specify whether we have a single process reading from the channel or if we have multiple processes (the default is "false", meaning multiple).
With multiple readers, there is more overhead when readers are using "WaitToReadAsync" or "TryReadAsync".
If we only have a SingleReader, then the compiler can optimize the code.
For our example code, we only have 1 reader (in the "Listen" function). We could set this option to "true" for our code.
Option: AllowSynchronousContinuations
Another option is "AllowSynchronousContinuations". I won't pretend to understand the specifics behind this option.
Here is the documentation (ChannelOptions.AllowSynchronousContinuations Property):
Setting this option to true can provide measurable throughput improvements by avoiding scheduling additional work items. However, it may come at the cost of reduced parallelism, as for example a producer may then be the one to execute work associated with a consumer, and if not done thoughtfully, this can lead to unexpected interactions. The default is false.
I'm not exactly sure what scenarios this option is useful for. But it looks like we can get increased throughput at the cost of decreased parallelism.
The main point is that Channel<T> has some tuning that is not available in concurrent collections.
Channel<T> is not an Enumerable
Another thing to be aware of when looking at the differences between Channel<T> and ConcurrentQueue<T> (or other concurrent collections) is that Channel<T> is not an IEnumerable<T>. It also does not implement any other collection interfaces (such as ICollection<T>, IReadOnlyCollection<T>, or IList<T>).
IEnumerable<T> is important for 2 reason.
First, this is important because of LINQ (Language Integrated Query). LINQ provides a set of extension methods on IEnumerable<T> (and IQueryable<T> and some other types) that let us filter, sort, aggregate, group, and transform data in collections.
ConcurrentQueue<T> does implement IEnumerable<T>, so it has all of these extension methods available.
Channel<T> does not implement IEnumerable<T> (the same is true of ChannelReader<T>), so these extension methods are not available.
Second, IEnumerable<T> is important because it allows us to "foreach" through a collection.
ConcurrentQueue<T> does support "foreach".
Channel<T> does not directly support "foreach", but we have seen that ChannelReader<T> has a "ReadAllAsync" method that returns IAsyncEnumerable<T>. So this allows us to use "await foreach" on a channel reader, but it's less direct than using a concurrent collection.
There is no Peek
Update: ChannelReader<T> now has both Peek and TryPeek methods. This lets you look at the next item (if there is one) without removing it from the channel.
Which to Use?
So the final question is "Should I use Channel<T> instead of ConcurrentQueue<T>?"
Personally, I like the specialization of Channel<T>, since this is what I generally need -- the ability to produce items in one concurrent operation (or set of operations) and read the output in another concurrent operation.
I like how I can be explicit about the processes that are writing to the channel as opposed to the processes that are reading from the channel. I also like the ability to mark the Writer as "Complete" so that I can stop reading. (This is doable with a ConcurrentQueue, but we would need to write our own signaling code for that.)
If ConcurrentQueue<T> is working for you, there is no reason to rip it out and change it to Channel<T> unless there are specific features (such as the optimizations) that you need. But for new code, I would recommend considering if Channel<T> will work for you.
I'm still exploring Channel<T> and figuring out how I can adjust my concurrent/parallel patterns to take advantage of it. Be sure to check back. I'll have more articles on what I've learned.
Happy Coding!
Thanks for the follow up article Jeremy.
ReplyDeleteIf you need peek functionality, you should look at System.IO.Pipes; it is similar to channels but allows for a much more nuanced reading of the contents.
ReplyDeleteIn your example with the Parallel.ForEach, the WriteAsync is not awaited. This raises a question : Why use Task.Run if you await nothing and let the Parallel do its thing ? Am I missing something ?
ReplyDeleteThis code is still a work in progress. The Parallel.ForEach ends up starving the listener that writes the output to the console, so the output is delayed instead of running concurrently with the other processes. The GitHub repo also has a WPF version of this application that works fine (apparently WPF is really good at protecting the main/UI thread). I put the Parallel.Foreach in a Task because it would normally block until the loop is complete. I need the UI to continue processing, and that works in the WPF app. In the console app, I currently have a "MaxDegreesOfParallelism" opiton set to prevent the starvation. I don't like this solution because it is based on my CPU (I set it to 7 on my 8 core machine, and it works fine. On an older machine (4 core), I need to set it to 3 for it to work.) Adding an "await" to the WriteAsync does not make a difference for this particular issue. I'm still experimenting to find a good solution :)
DeleteTo producer and listener to run concurrently should you use await Task.WhenAll(new [] { producer, listener }) instead?
ReplyDeleteI debated whether to get into those details in this article, and I decided against it. Here's the flow: (1) I kick off the Listen process so it will start pulling items off of the channel as soon as they become available. (2) I kick off the Produce process to start putting items onto the channel. The items will be pulled off by the already running listener. (3) I await the producer because I don't want to continue until after all of the items are processed. (4) I await the listener because I don't want to continue until after the channel is drained.
DeleteUsing "await Task.WhenAll" would work fine here. I kind of like the separate awaits because it shows that I expect the producer to finish before the listener, but you should use whatever is most readable to you and your team.