Channels give us a way to communicate between concurrent (async) operations in
.NET. Channel<T> was included in .NET Core 3.0 (prior to that, it was available as a separate NuGet package). I first encountered channels in Go (golang)
several years ago, so I was really interested when I found out that they are
available to use in C#.
Channels give us a way to communicate between concurrent (async) operations in .NET.
We can think of a channel as a sort of thread-safe queue. This means that we
can add items to a channel in one concurrent operation and read those items in
another. (We can also have multiple readers and multiple writers.)
As an introduction to channels, we'll look at an example that mirrors some
code from the Go programming language (golang). This will give us an idea of
the syntax and how channels work. In a future article, we'll look at a more
real-world example.
The Go Example: I have been exploring Go for a while and looking at the
similarities and differences compared to C#. You can look at the Go example
(and accompanying article) here: Go (golang) Channels - Moving Data Between Concurrent Processes. The article also mentions a little bit about C# channels.
What is Channel<T>?
The purpose of a channel is to let us communicate between 2 (or more)
concurrent operations. Like a thread-safe queue, we can add an item to the
queue, and we can remove an item from the queue. Since channels are
thread-safe, we can have multiple processes writing to and reading from the
channel without worrying about missed writes or duplicate reads.
Update: I've had several questions regarding the difference between Channel<T> and ConcurrentQueue<T> (or other concurrent collections). The short answer: specialization and optimization. For a longer answer, here's an article: What's the Difference between Channel and ConcurrentQueue in C#?
An Example
Let's imagine that we need to transform 1,000 records and save the resulting
data. Due to the nature of the operation, the transformation takes several
seconds to complete. In this scenario, we can set up several concurrent
processes that transform records. When a transformation is complete, it puts
the record onto the channel.
In a separate operation, we have a process that listens to that channel. When
an item is available, it takes it off of the channel and saves it to the data
store. Since this operation is faster (only taking a few milliseconds), we can
have a single operation that is responsible for this.
This is an implementation of the Observer pattern -- at its core, it is a
publish-subscribe relationship. The channel takes results from the publisher
(the transformation process) and provides them to the subscriber (the saving
process).
Channel Operations
There are a number of operations available with Channel<T>. The primary
operations that we'll look at today include the following:
- Creating a channel
- Writing to a channel
- Closing a channel when we are done writing to it
- Reading from a channel
Channel<T> gives us quite a few ways of accomplishing each of these
functions. What we'll see today will give us a taste of what is available, but
there is more to explore.
Parallel Operations
The sample project is available on GitHub: https://github.com/jeremybytes/csharp-channels. This project has a service with several end points. One end point provides
a single "Person" record based on an ID. This operation has a built-in 1
second delay (to simulate a bit of lag).
Our code is in a console application that gets data -- the goal is to pull out
9 records from the service. If we do this sequentially, then the operation
takes 9 seconds. But if we do this concurrently (i.e., in parallel), the the
operation takes a little over 1 second.
The concurrent operations that call the service put their results onto a
channel. Then we have a listener that reads values off of the channel and
displays them in the console.
Creating a Channel
To create a channel, we use static methods on the "Channel" class. The two
methods we can choose from are "CreateBoundedChannel" and
"CreateUnboundedChannel". A bounded channel has a constraint on how many items
the channel can hold at one time. If a channel is full, a writer can wait
until there is space available in the channel.
Whether we have a bounded or an unbounded channel, we must also specify a type
(using a generic type parameter). A channel can only hold one type of object.
Here is the code from our sample (line 61 in the
Program.cs file
in the project mentioned above):
C#
var channel = Channel.CreateBounded<Person>(10);
The above code creates a bounded channel that can hold up to 10 "Person"
objects. Our sample data only has 9 items, so we will not need to worry about
the channel reaching capacity for this example.
Writing to a Channel
A channel has 2 primary properties: a Reader and a Writer. For our channel,
the Writer property is a ChannelWriter<Person>. A channel writer has
several methods; we'll use the "WriteAsync" method.
Here's the "WriteAsync" method call (line 72 in the
Program.cs file):
C#
await channel.Writer.WriteAsync(person);
Like many things in C#, this method is asynchronous. The "await" will pause
the operation until it is complete.
Let's look at the "WriteAsync" method in context. Here is the enclosing
section (lines 64 to 81 in the
Program.cs file):
C#
// Write person objects to channel asynchronously
foreach (var id in ids)
{
async Task func(int id)
{
try
{
var person = await GetPersonAsync(id);
await channel.Writer.WriteAsync(person);
}
catch (HttpRequestException ex)
{
Console.WriteLine($"ERROR: ID {id}:
{ex.Message}");
}
}
asyncCalls.Add(func(id));
}
The "foreach" loops through a collection of "ids" (the "id" property of the
"Person" object).
Inside the "foreach" loop, we have a local function (named "func" -- not a
great name, but it makes the code similar to the anonymous function in the Go
example).
The "func" function calls "GetPersonAsync". "GetPersonAsync" calls the service
and returns the resulting "person".
After getting the "person", we put that object onto the channel using
"Writer.WriteAsync".
The catch block is looking for exceptions that might be thrown from the
service -- for example if the service is not available or the service returns
an empty object.
The last line of the "foreach" loop does a couple of things. It calls the
local "func" function using the "id" value from the loop. "func" is
asynchronous and returns a Task.
"asyncCalls" is a "List<Task>". By calling "asyncCalls.Add()", we add
the Task from the "func" call to the list. We'll see how this is used later.
There are no blocking operations inside the "foreach" loop. Instead, each
iteration of the loop kicks off a concurrent (async) operation. This means
that the loop will complete very quickly, and we will have 9 operations
running concurrently.
Closing a Channel
When we are done writing to a channel, we should close it. This tells anyone
reading from the channel that there will be no more values, and they can stop
reading. We'll see why this is important when we look at reading from a
channel.
One issue that we have to deal with in our code is that we do not want to
close the channel until all 9 of the concurrent operations are complete. This
is why we have the "asyncCalls" list. It has a list of all of the operations
(Tasks), and we can wait until they are complete.
Here's the code for that (lines 83 to 88 in the
Program.cs file):
C#
// Wait for the async tasks to complete & close the
channel
_ = Task.Run(async () =>
{
await Task.WhenAll(asyncCalls);
channel.Writer.Complete();
});
"await Task.WhenAll(asyncCalls)" will pause the operation of this section of
code until all of the concurrent tasks are complete. When the "await" is done,
we know that it is safe to close the channel.
The next line -- "channel.Writer.Complete()" -- closes the channel. This
indicates that no new items will be written to the channel.
These 2 lines of code are wrapped in a task (using "Task.Run"). The reason for
this is that we want the application to continue without blocking at this
point. The "Task.WhenAll" will block, but it will block inside a
separate concurrent (asynchronous) operation.
This means that the read operation (in the next section) can start running
before all of the concurrent tasks are complete.
If running this section asynchronously is a bit hard to grasp, that's okay. It
took me a while to wrap my head around it. For our limited sample (with only 9
records), it does not make any visible difference. But it's a good pattern to
get used to for larger, more complex scenarios.
Reading from a Channel
Our last step is to read from a channel -- this uses the "Reader" property
from the channel. For our example, "Reader" is a
"ChannelReader<Person>".
There are multiple ways that we can read from the channel. The sample code has
an "Option 1" and "Option 2" listed (with one of the options commented out).
We'll look at "Option 2" first.
Option: WaitToReadAsync / TryRead
"Option 2" uses a couple of "while" loops to read from the channel. Here is
the code (lines 97 to 105 in the
Program.cs file):
C#
// Read person objects from the channel until the channel is
closed
// OPTION 2: Using WaitToReadAsync / TryRead
while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out Person person))
{
Console.WriteLine($"{person.ID}: {person}");
}
}
The first "while" loop makes a call to "WaitToReadAsync" on the channel
"Reader" property. When we "await" the "WaitToReadAsync" method, the code will
pause until an item is available to read from the channel *or* until the
channel is closed.
When an item is available to read, "WaitToReadAsync" returns "true" (so the
"while" loop will continue). If the channel is closed, then "WaitToReadAsync"
returns "false", and the "while" loop will exit.
So far, this only tells us whether there is an item available to read. We have
not done any actual reading. For that, we have the next "while" loop.
The inner "while" loop makes a call to "TryRead" on the channel "Reader"
property. As with most "Try" methods, this uses an "out" parameter to hold the
value. If the read is successful, then the "person" variable is populated, and
the value is written to the console.
"TryRead" will return "false" if there are no items available. If that's the
case, then it will exit the inner "while" loop and return to the outer "while"
loop to wait for the next available item (or the closing of the channel).
Note: After the channel is closed (meaning the Writer is marked Complete), the Reader will continue to read from the channel until the channel is empty. Once a closed channel is empty, the "WaitToReadAsync" will return false and the loop will exit.
Option: ReadAllAsync
Another option ("Option 1" in the code) is to use "ReadAllAsync". This code is
nice because it has fewer steps. But it also uses an IAsyncEnumerable. If you
are not familiar with IAsyncEnumerable, then it can be a bit difficult to
understand. That's why both options are included.
Let's look at the code that uses "ReadAllAsync" (lines 90 to 95 in the
Program.cs file):
C#
// Read person objects from the channel until the channel is
closed
// OPTION 1: Using ReadAllAsync (IAsyncEnumerable)
await foreach (Person person in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"{person.ID}: {person}");
}
"ReadAllAsync" is a method on the channel "Reader" property. This returns an
"IAsyncEnumerable<Person>". This combines the power of enumeration
("foreach") with the concurrency of "async".
The "foreach" loop condition looks pretty normal -- we iterate on the result
of "ReadAllAsync" and use the "Person" object from that iteration.
What is a little different is that there is an "await" before the "foreach".
This means that the "foreach" loop may pause between iterations if it needs to
wait for an item to be populated in the channel.
The effect is that the "foreach" will continue to provide values until the
channel is closed and empty. Inside the loop, we write the value to the console.
At a high level, this code is a bit easier to understand (since it does not
have nested loops and uses a familiar "foreach" loop). But when we look more
closely, we see that we "await" the loop, and what returns from "ReadAllAsync"
may not be a complete set of data -- our loop may wait for items to be
populated in the channel.
I would recommend learning more and getting comfortable with IAsyncEnumerable
(which was added with C# 8). It's pretty powerful and, as we've seen here, it
can simplify our code. But it only simplifies the code if we understand what
it's doing.
Application Output
To run the application, first start the service in the "net-people-service"
folder. Navigate to the folder from the command line (I'm using PowerShell here), and type "dotnet run".
This starts the service and hosts it at "http://localhost:9874".
Console
PS C:\Channels\net-people-service> dotnet run
info: Microsoft.Hosting.Lifetime[0]
Now listening on: http://localhost:9874
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
Content root path:
C:\Channels\net-people-service
To run the application, navigate to the "async" folder (from a new command line)
and type "dotnet run".
Console
PS C:\Channels\async> dotnet run
[1 2 3 4 5 6 7 8 9]
9: Isaac Gampu
3: Turanga Leela
7: John Sheridan
4: John Crichton
6: Laura Roslin
5: Dave Lister
2: Dylan Hunt
1: John Koenig
8: Dante Montana
Total Time: 00:00:01.3324302
PS C:\Channels\async>
There are a couple of things to note about this output. First, notice the "Total Time" at the bottom. This takes a bit over 1 second to complete. As mentioned earlier, each call to the service takes at least 1 second (due to a delay in the service). This shows us that all 9 of these service calls happen concurrently.
Next, notice the order of the output. It is non-deterministic. Even though we have a "foreach" loop that goes through the ids in sequence (1, 2, 3, 4, 5, 6, 7, 8, 9), the output is not in the same order. This is one of the joys of running things in parallel -- the order things complete is not the same as the order things were started.
If we run this application multiple times, the items will be in a different order each time. (Okay, not different *each* time since we are only dealing with 9 values, but we cannot guarantee the order of the results.)
There's Much More
In this article, we've looked at some of the functions of Channel<T>,
including creating, writing to, closing, and reading from channels. But
Channel<T> has much more functionality.
When we create a channel (using "CreateBounded" or "CreateUnbounded"), we can
also include a set of options. These include "SingleReader" and "SingleWriter"
to indicate that we will only have one read or write operation at a time. This
allows the compiler to optimize the channel. In addition, there is an
"AllowSynchronousContinuations" option. This can increase throughput by
reducing parallelism.
ChannelWriter<T> has several methods in addition to "WriteAsync". These
include "WaitToWriteAsync" and "TryWrite" which we may need in scenarios where
we have bounded channels and we exceed the capacity. There is also a
"TryComplete" method that tries to close the channel.
ChannelReader<T> has many methods as well. Today, we saw "ReadAllAsync",
"WaitToReadAsync", and "TryRead". But there is also a "ReadAsync". We can mix
and match these methods in different ways depending on what our needs are.
Wrap Up
Channel<T> is really interesting to me. When I started looking at Go
(golang) a few years back, I found channels to be clean and easy to use. I
wondered if it would make sense to have something similar in C#. Apparently, I
was not the only one. Channel<T> uses many of the same concepts as
channels in Go, but with a bit more power and functionality. This adds
complexity as well (but I've learned to accept complexity in the C# world).
In today's example, we saw how Channel<T> lets us communicate between
concurrent operations. In our case, we have 9 concurrent operations that all
write their results to a channel. Then we have a reader that pulls the results
from the channel and displays them on the console.
This allowed us to see some basics of creating, writing to, closing, and
reading from channels. But the real world can get a bit more complicated.
I have been experimenting with channels in my digit recognition project (https://github.com/jeremybytes/digit-display). This uses machine learning to recognize hand-written digits from bitmap
data. These operations are computationally expensive, so I've been using tasks
with continuations to run them in parallel. But I've also created versions
that uses channels. This simplifies some things (I don't need to be as
concerned about getting back to the UI thread in the desktop application). But
it creates issues in others (using a Parallel.Foreach can end up starving the
main application thread). I've got a bit more experimenting to do. The
solution that I have right now works, but it doesn't feel quite right.
As always, there is still more to learn.
Happy Coding!
@Jeremy, To make this interesting what changes would be required if more people gets added in people service from Administration interface and we have to process them in console app. Shall we use Task.Delay and call GetIdAsync by Paged resultset?
ReplyDeleteThat is very interesting (and by that, I mean I want to try some code immediately). I spent my entire contemplation time (a.k.a. shower time) considering scenarios.
Delete(1) Is order important? Right now the result order is indeterminate due to concurrency. We could add something to the "foreach" to chunk the results and maintain a rough grouping.
(2) Can we preprocess data (meaning, after the first page is done, can we process the second page before the user hits "continue")? If so, we may be able to use the capacity of a bounded channel to pause the processing between pages.
(3) In Go, I would use try using a WaitGroup (https://jeremybytes.blogspot.com/2021/02/go-golang-waitgroup-signal-that.html) to signal when the next page should process. In C#, we may be able to use another channel to communicate from the UI to the async processor to indicate that the next page is requested.
I will be experimenting with some code, and I will share my results in another article :)
Is this just an alternate to the queues and other classes in the System.Collections.Concurrent namespace?
ReplyDeleteThat's sort of what I was wondering. I use the ConcurrentQueue. This does sound cool enough though.
DeleteIt is more specialized than ConcurrentQueue, and it offers optimization options as well. If you're interested, I ended up writing an article about it: https://jeremybytes.blogspot.com/2021/02/whats-difference-between-channel-and.html
DeleteLove to see how Channel can use different transport (like gRPC)
ReplyDeletenice article thanks for information
ReplyDeleteNice article Jeremy, thank you!
ReplyDeleteIn the first part of the article you talk about multiple processes using the `Channel` to communicate. Does this imply that `Channel` offers functionality that allows it to be used for inter-process communications, or was "process" used as a stand-in for threads in a single process?
I used "process" as a stand-in for "threads". I avoid using "thread" with Task/async because some concurrent operations do not use separate threads. So, terminology gets weird. The multiple readers and writers for a channel are all need to be within the same application.
DeletePerfect, thanks!
DeleteI think this resembles the DataFlow classes and the example here could as easily been solved using those. (It might even have been easier since you can set it as complete as soon as you have added all items).
ReplyDeleteThere are classes for transformations, aggregations and more in the DataFlow namespace.
Hi Jeremy,
ReplyDeleteNice post. I have a question, why inside the write loop you define a function? Can I define this anonymous function outside the loop?
Yes, the anonymous function can be declared outside of the loop (it probably should be). I put it inside the loop so that it is similar to an example I have in Go (article: https://jeremybytes.blogspot.com/2021/02/go-golang-anonymous-functions-inlining.html). In the Go example, an anonymous function is declared and run in a single step, so it is inside the loop. In the C# example, things are a bit more complicated because we need to grab the task that comes back from the local function and add it to the task list. This lets us wait until all the async tasks are done. Go has a different approach to concurrency.
DeleteIf I were to write code like this (without trying to match another example), I would declare the local function outside of the loop (there is no need to declare it multiple times), and I would give it a better name than "func" :)
I think there is no need for that function at all as you can simply collect list of oryginal tasks (don't await them). It may look like this:
Deletetry
{
var tasks = ids.Select(x => GetPersonAsync(x).ContinueWith(t =>
channel.Writer.WriteAsync(t.Result)).Unwrap());
await Task.WhenAll(tasks);
}
catch(AggregateException ex)
{
//log ex.InnerExceptions
}