Monday, February 8, 2021

An Introduction to Channels in C#

Channels give us a way to communicate between concurrent (async) operations in .NET. Channel<T> was included in .NET Core 3.0 (prior to that, it was available as a separate NuGet package). I first encountered channels in Go (golang) several years ago, so I was really interested when I found out that they are available to use in C#.
Channels give us a way to communicate between concurrent (async) operations in .NET.
We can think of a channel as a sort of thread-safe queue. This means that we can add items to a channel in one concurrent operation and read those items in another. (We can also have multiple readers and multiple writers.)

As an introduction to channels, we'll look at an example that mirrors some code from the Go programming language (golang). This will give us an idea of the syntax and how channels work. In a future article, we'll look at a more real-world example.

The Go Example: I have been exploring Go for a while and looking at the similarities and differences compared to C#. You can look at the Go example (and accompanying article) here: Go (golang) Channels - Moving Data Between Concurrent Processes. The article also mentions a little bit about C# channels.

What is Channel<T>?

The purpose of a channel is to let us communicate between 2 (or more) concurrent operations. Like a thread-safe queue, we can add an item to the queue, and we can remove an item from the queue. Since channels are thread-safe, we can have multiple processes writing to and reading from the channel without worrying about missed writes or duplicate reads.

Update: I've had several questions regarding the difference between Channel<T> and ConcurrentQueue<T> (or other concurrent collections). The short answer: specialization and optimization. For a longer answer, here's an article: What's the Difference between Channel and ConcurrentQueue in C#?

An Example
Let's imagine that we need to transform 1,000 records and save the resulting data. Due to the nature of the operation, the transformation takes several seconds to complete. In this scenario, we can set up several concurrent processes that transform records. When a transformation is complete, it puts the record onto the channel.

In a separate operation, we have a process that listens to that channel. When an item is available, it takes it off of the channel and saves it to the data store. Since this operation is faster (only taking a few milliseconds), we can have a single operation that is responsible for this.

This is an implementation of the Observer pattern -- at its core, it is a publish-subscribe relationship. The channel takes results from the publisher (the transformation process) and provides them to the subscriber (the saving process).

Channel Operations
There are a number of operations available with Channel<T>. The primary operations that we'll look at today include the following: 
  1. Creating a channel
  2. Writing to a channel
  3. Closing a channel when we are done writing to it
  4. Reading from a channel
Channel<T> gives us quite a few ways of accomplishing each of these functions. What we'll see today will give us a taste of what is available, but there is more to explore.

Parallel Operations

The sample project is available on GitHub: https://github.com/jeremybytes/csharp-channels. This project has a service with several end points. One end point provides a single "Person" record based on an ID. This operation has a built-in 1 second delay (to simulate a bit of lag).

Our code is in a console application that gets data -- the goal is to pull out 9 records from the service. If we do this sequentially, then the operation takes 9 seconds. But if we do this concurrently (i.e., in parallel), the the operation takes a little over 1 second.

The concurrent operations that call the service put their results onto a channel. Then we have a listener that reads values off of the channel and displays them in the console.

Creating a Channel

To create a channel, we use static methods on the "Channel" class. The two methods we can choose from are "CreateBoundedChannel" and "CreateUnboundedChannel". A bounded channel has a constraint on how many items the channel can hold at one time. If a channel is full, a writer can wait until there is space available in the channel.

Whether we have a bounded or an unbounded channel, we must also specify a type (using a generic type parameter). A channel can only hold one type of object.

Here is the code from our sample (line 61 in the Program.cs file in the project mentioned above):

C#
    var channel = Channel.CreateBounded<Person>(10);

The above code creates a bounded channel that can hold up to 10 "Person" objects. Our sample data only has 9 items, so we will not need to worry about the channel reaching capacity for this example.

Writing to a Channel

A channel has 2 primary properties: a Reader and a Writer. For our channel, the Writer property is a ChannelWriter<Person>. A channel writer has several methods; we'll use the "WriteAsync" method.

Here's the "WriteAsync" method call (line 72 in the Program.cs file):

C#
    await channel.Writer.WriteAsync(person);

Like many things in C#, this method is asynchronous. The "await" will pause the operation until it is complete.

Let's look at the "WriteAsync" method in context. Here is the enclosing section (lines 64 to 81 in the Program.cs file):

C#
    // Write person objects to channel asynchronously
    foreach (var id in ids)
    {
      async Task func(int id)
      {
        try
        {
          var person = await GetPersonAsync(id);
          await channel.Writer.WriteAsync(person);
        }
        catch (HttpRequestException ex)
        {
          Console.WriteLine($"ERROR: ID {id}: {ex.Message}");
        }
      }
      asyncCalls.Add(func(id));
    }

The "foreach" loops through a collection of "ids" (the "id" property of the "Person" object).

Inside the "foreach" loop, we have a local function (named "func" -- not a great name, but it makes the code similar to the anonymous function in the Go example).

The "func" function calls "GetPersonAsync". "GetPersonAsync" calls the service and returns the resulting "person".

After getting the "person", we put that object onto the channel using "Writer.WriteAsync".

The catch block is looking for exceptions that might be thrown from the service -- for example if the service is not available or the service returns an empty object.

The last line of the "foreach" loop does a couple of things. It calls the local "func" function using the "id" value from the loop. "func" is asynchronous and returns a Task.

"asyncCalls" is a "List<Task>". By calling "asyncCalls.Add()", we add the Task from the "func" call to the list. We'll see how this is used later.

There are no blocking operations inside the "foreach" loop. Instead, each iteration of the loop kicks off a concurrent (async) operation. This means that the loop will complete very quickly, and we will have 9 operations running concurrently.

Closing a Channel

When we are done writing to a channel, we should close it. This tells anyone reading from the channel that there will be no more values, and they can stop reading. We'll see why this is important when we look at reading from a channel.

One issue that we have to deal with in our code is that we do not want to close the channel until all 9 of the concurrent operations are complete. This is why we have the "asyncCalls" list. It has a list of all of the operations (Tasks), and we can wait until they are complete.

Here's the code for that (lines 83 to 88 in the Program.cs file):

C#
    // Wait for the async tasks to complete & close the channel
    _ = Task.Run(async () =>
        {
          await Task.WhenAll(asyncCalls);
          channel.Writer.Complete();
        });

"await Task.WhenAll(asyncCalls)" will pause the operation of this section of code until all of the concurrent tasks are complete. When the "await" is done, we know that it is safe to close the channel.

The next line -- "channel.Writer.Complete()" -- closes the channel. This indicates that no new items will be written to the channel.

These 2 lines of code are wrapped in a task (using "Task.Run"). The reason for this is that we want the application to continue without blocking at this point.  The "Task.WhenAll" will block, but it will block inside a separate concurrent (asynchronous) operation.

This means that the read operation (in the next section) can start running before all of the concurrent tasks are complete.

If running this section asynchronously is a bit hard to grasp, that's okay. It took me a while to wrap my head around it. For our limited sample (with only 9 records), it does not make any visible difference. But it's a good pattern to get used to for larger, more complex scenarios.

Reading from a Channel

Our last step is to read from a channel -- this uses the "Reader" property from the channel.  For our example, "Reader" is a "ChannelReader<Person>".

There are multiple ways that we can read from the channel. The sample code has an "Option 1" and "Option 2" listed (with one of the options commented out). We'll look at "Option 2" first.

Option: WaitToReadAsync / TryRead
"Option 2" uses a couple of "while" loops to read from the channel. Here is the code (lines 97 to 105 in the Program.cs file):

C#
    // Read person objects from the channel until the channel is closed
    // OPTION 2: Using WaitToReadAsync / TryRead
    while (await channel.Reader.WaitToReadAsync())
    {
      while (channel.Reader.TryRead(out Person person))
      {
        Console.WriteLine($"{person.ID}: {person}");
      }
    }

The first "while" loop makes a call to "WaitToReadAsync" on the channel "Reader" property. When we "await" the "WaitToReadAsync" method, the code will pause until an item is available to read from the channel *or* until the channel is closed.

When an item is available to read, "WaitToReadAsync" returns "true" (so the "while" loop will continue). If the channel is closed, then "WaitToReadAsync" returns "false", and the "while" loop will exit.

So far, this only tells us whether there is an item available to read. We have not done any actual reading. For that, we have the next "while" loop.

The inner "while" loop makes a call to "TryRead" on the channel "Reader" property. As with most "Try" methods, this uses an "out" parameter to hold the value. If the read is successful, then the "person" variable is populated, and the value is written to the console.

"TryRead" will return "false" if there are no items available. If that's the case, then it will exit the inner "while" loop and return to the outer "while" loop to wait for the next available item (or the closing of the channel).

Note: After the channel is closed (meaning the Writer is marked Complete), the Reader will continue to read from the channel until the channel is empty. Once a closed channel is empty, the "WaitToReadAsync" will return false and the loop will exit.

Option: ReadAllAsync
Another option ("Option 1" in the code) is to use "ReadAllAsync". This code is nice because it has fewer steps. But it also uses an IAsyncEnumerable. If you are not familiar with IAsyncEnumerable, then it can be a bit difficult to understand. That's why both options are included.

Let's look at the code that uses "ReadAllAsync" (lines 90 to 95 in the Program.cs file):

C#
    // Read person objects from the channel until the channel is closed
    // OPTION 1: Using ReadAllAsync (IAsyncEnumerable)
    await foreach (Person person in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"{person.ID}: {person}");
    }

"ReadAllAsync" is a method on the channel "Reader" property. This returns an "IAsyncEnumerable<Person>". This combines the power of enumeration ("foreach") with the concurrency of "async".

The "foreach" loop condition looks pretty normal -- we iterate on the result of "ReadAllAsync" and use the "Person" object from that iteration.

What is a little different is that there is an "await" before the "foreach". This means that the "foreach" loop may pause between iterations if it needs to wait for an item to be populated in the channel.

The effect is that the "foreach" will continue to provide values until the channel is closed and empty. Inside the loop, we write the value to the console.

At a high level, this code is a bit easier to understand (since it does not have nested loops and uses a familiar "foreach" loop). But when we look more closely, we see that we "await" the loop, and what returns from "ReadAllAsync" may not be a complete set of data -- our loop may wait for items to be populated in the channel.

I would recommend learning more and getting comfortable with IAsyncEnumerable (which was added with C# 8). It's pretty powerful and, as we've seen here, it can simplify our code. But it only simplifies the code if we understand what it's doing.

Application Output

To run the application, first start the service in the "net-people-service" folder. Navigate to the folder from the command line (I'm using PowerShell here), and type "dotnet run". This starts the service and hosts it at "http://localhost:9874".

Console
    PS C:\Channels\net-people-service> dotnet run
    info: Microsoft.Hosting.Lifetime[0]
        Now listening on: http://localhost:9874
    info: Microsoft.Hosting.Lifetime[0]
        Application started. Press Ctrl+C to shut down.
    info: Microsoft.Hosting.Lifetime[0]
        Hosting environment: Development
    info: Microsoft.Hosting.Lifetime[0]
        Content root path: C:\Channels\net-people-service

To run the application, navigate to the "async" folder (from a new command line) and type "dotnet run".

Console
    PS C:\Channels\async> dotnet run
    [1 2 3 4 5 6 7 8 9]
    9: Isaac Gampu
    3: Turanga Leela
    7: John Sheridan
    4: John Crichton
    6: Laura Roslin
    5: Dave Lister
    2: Dylan Hunt
    1: John Koenig
    8: Dante Montana
    Total Time: 00:00:01.3324302
    PS C:\Channels\async>

There are a couple of things to note about this output. First, notice the "Total Time" at the bottom. This takes a bit over 1 second to complete. As mentioned earlier, each call to the service takes at least 1 second (due to a delay in the service). This shows us that all 9 of these service calls happen concurrently.

Next, notice the order of the output. It is non-deterministic. Even though we have a "foreach" loop that goes through the ids in sequence (1, 2, 3, 4, 5, 6, 7, 8, 9), the output is not in the same order. This is one of the joys of running things in parallel -- the order things complete is not the same as the order things were started.

If we run this application multiple times, the items will be in a different order each time. (Okay, not different *each* time since we are only dealing with 9 values, but we cannot guarantee the order of the results.)

There's Much More

In this article, we've looked at some of the functions of Channel<T>, including creating, writing to, closing, and reading from channels. But Channel<T> has much more functionality.

When we create a channel (using "CreateBounded" or "CreateUnbounded"), we can also include a set of options. These include "SingleReader" and "SingleWriter" to indicate that we will only have one read or write operation at a time. This allows the compiler to optimize the channel. In addition, there is an "AllowSynchronousContinuations" option. This can increase throughput by reducing parallelism.

ChannelWriter<T> has several methods in addition to "WriteAsync". These include "WaitToWriteAsync" and "TryWrite" which we may need in scenarios where we have bounded channels and we exceed the capacity. There is also a "TryComplete" method that tries to close the channel.

ChannelReader<T> has many methods as well. Today, we saw "ReadAllAsync", "WaitToReadAsync", and "TryRead". But there is also a "ReadAsync". We can mix and match these methods in different ways depending on what our needs are.

Wrap Up

Channel<T> is really interesting to me. When I started looking at Go (golang) a few years back, I found channels to be clean and easy to use. I wondered if it would make sense to have something similar in C#. Apparently, I was not the only one. Channel<T> uses many of the same concepts as channels in Go, but with a bit more power and functionality. This adds complexity as well (but I've learned to accept complexity in the C# world).

In today's example, we saw how Channel<T> lets us communicate between concurrent operations. In our case, we have 9 concurrent operations that all write their results to a channel. Then we have a reader that pulls the results from the channel and displays them on the console.

This allowed us to see some basics of creating, writing to, closing, and reading from channels. But the real world can get a bit more complicated.

I have been experimenting with channels in my digit recognition project (https://github.com/jeremybytes/digit-display). This uses machine learning to recognize hand-written digits from bitmap data. These operations are computationally expensive, so I've been using tasks with continuations to run them in parallel. But I've also created versions that uses channels. This simplifies some things (I don't need to be as concerned about getting back to the UI thread in the desktop application). But it creates issues in others (using a Parallel.Foreach can end up starving the main application thread). I've got a bit more experimenting to do. The solution that I have right now works, but it doesn't feel quite right.

As always, there is still more to learn.

Happy Coding!

14 comments:

  1. @Jeremy, To make this interesting what changes would be required if more people gets added in people service from Administration interface and we have to process them in console app. Shall we use Task.Delay and call GetIdAsync by Paged resultset?

    ReplyDelete
    Replies
    1. That is very interesting (and by that, I mean I want to try some code immediately). I spent my entire contemplation time (a.k.a. shower time) considering scenarios.
      (1) Is order important? Right now the result order is indeterminate due to concurrency. We could add something to the "foreach" to chunk the results and maintain a rough grouping.
      (2) Can we preprocess data (meaning, after the first page is done, can we process the second page before the user hits "continue")? If so, we may be able to use the capacity of a bounded channel to pause the processing between pages.
      (3) In Go, I would use try using a WaitGroup (https://jeremybytes.blogspot.com/2021/02/go-golang-waitgroup-signal-that.html) to signal when the next page should process. In C#, we may be able to use another channel to communicate from the UI to the async processor to indicate that the next page is requested.
      I will be experimenting with some code, and I will share my results in another article :)

      Delete
  2. Is this just an alternate to the queues and other classes in the System.Collections.Concurrent namespace?

    ReplyDelete
    Replies
    1. That's sort of what I was wondering. I use the ConcurrentQueue. This does sound cool enough though.

      Delete
    2. It is more specialized than ConcurrentQueue, and it offers optimization options as well. If you're interested, I ended up writing an article about it: https://jeremybytes.blogspot.com/2021/02/whats-difference-between-channel-and.html

      Delete
  3. Love to see how Channel can use different transport (like gRPC)

    ReplyDelete
  4. nice article thanks for information

    ReplyDelete
  5. Nice article Jeremy, thank you!

    In the first part of the article you talk about multiple processes using the `Channel` to communicate. Does this imply that `Channel` offers functionality that allows it to be used for inter-process communications, or was "process" used as a stand-in for threads in a single process?

    ReplyDelete
    Replies
    1. I used "process" as a stand-in for "threads". I avoid using "thread" with Task/async because some concurrent operations do not use separate threads. So, terminology gets weird. The multiple readers and writers for a channel are all need to be within the same application.

      Delete
  6. I think this resembles the DataFlow classes and the example here could as easily been solved using those. (It might even have been easier since you can set it as complete as soon as you have added all items).
    There are classes for transformations, aggregations and more in the DataFlow namespace.

    ReplyDelete
  7. Hi Jeremy,

    Nice post. I have a question, why inside the write loop you define a function? Can I define this anonymous function outside the loop?

    ReplyDelete
    Replies
    1. Yes, the anonymous function can be declared outside of the loop (it probably should be). I put it inside the loop so that it is similar to an example I have in Go (article: https://jeremybytes.blogspot.com/2021/02/go-golang-anonymous-functions-inlining.html). In the Go example, an anonymous function is declared and run in a single step, so it is inside the loop. In the C# example, things are a bit more complicated because we need to grab the task that comes back from the local function and add it to the task list. This lets us wait until all the async tasks are done. Go has a different approach to concurrency.

      If I were to write code like this (without trying to match another example), I would declare the local function outside of the loop (there is no need to declare it multiple times), and I would give it a better name than "func" :)

      Delete
    2. I think there is no need for that function at all as you can simply collect list of oryginal tasks (don't await them). It may look like this:

      try
      {
      var tasks = ids.Select(x => GetPersonAsync(x).ContinueWith(t =>
      channel.Writer.WriteAsync(t.Result)).Unwrap());

      await Task.WhenAll(tasks);
      }
      catch(AggregateException ex)
      {
      //log ex.InnerExceptions
      }

      Delete