Tuesday, February 9, 2021

What's the Difference between Channel and ConcurrentQueue in C#?

In response to the previous article (Introduction to Channels in C#), I've received several questions from folks who have been using ConcurrentQueue<T> (or another type from Collections.Concurrent). These questions boil down to: What's the difference between Channel<T> and ConcurrentQueue<T>?
Short answer: Specialization and optimization.
If you're interested in the long answer, keep reading.

Specialization: Separate Reader and Writer Properties

In the last article, we saw that Channel<T> has separate Reader and Writer properties (which are ChannelReader<T> and ChannelWriter<T>, respectively). But we didn't really take advantage of that in the introductory demo.

Since Channel<T> has separate properties for reading and writing, we can create functions that can only read from a channel or only write to a channel.

As an example, we'll look at some code from a na├»ve machine learning project of mine: https://github.com/jeremybytes/digit-display. This recognizes hand-written digits from bitmaps. The process is a bit processor intensive (some operations take about 1/3 of a second). The code uses concurrent operations to take advantage of all of the cores of my CPU. This greatly speeds things up when processing hundreds of digits.

Example: Separate Writer
We'll start with a function that produces results from the input. This is taken from the "digit-console-channel" project in the repository mentioned above. I won't include line numbers in my references because that code is currently in flux.

Here is the "Produce" function (from the Program.cs file in the digit-console-channel project):

C#
    private static async Task Produce(ChannelWriter<Prediction> writer,
    string[] rawData, FSharpFunc<int[], Observation> classifier)
    {
      await Task.Run(() =>
      {
        Parallel.ForEach(rawData, imageString =>
        {
          // some bits skipped here.

          var result = Recognizers.predict(ints, classifier);

          var prediction = new Prediction 
          {
            // some bits skipped here, too.
          };

          writer.WriteAsync(prediction);
        });
      });

      writer.Complete();
    }

I won't go through all of the details of this code (that's for another time), but I will point out a couple of things.

First, the function takes "ChannelWriter<Prediction>" as a parameter. "Prediction" is a custom type that contains things like the predicted value, the actual value, and the original image.

The important thing about the parameter is that it indicates that this function will only write to the channel. It will not read from the channel. It does not have access to read from the channel.

Here is how this method is called in the code (from the Program.cs file in the digit-console-channel project):

C#
    var channel = Channel.CreateUnbounded<Prediction>();

    var listener = Listen(channel.Reader, log);
    var producer = Produce(channel.Writer, rawValidation, classifier);

    await producer;
    await listener;

In the above code, the first line creates an unbounded channel. Then in the third line, we call the "Produce" function and pass in "channel.Writer". Instead of passing in the entire channel, we pass in only the part that we need -- the "write" functionality.

Example: Separate Reader
Just like having a function that takes a ChannelWriter as a parameter, we have a function that takes a ChannelReader as a parameter.

Here is the "Listen" function (from the Program.cs file in the digit-console-channel project):

C#
    private static async Task Listen(ChannelReader<Prediction> reader,
    List<Prediction> log)
    {
      await foreach (Prediction prediction in reader.ReadAllAsync())
      {
        // Display the result
        Console.SetCursorPosition(0, 0);
        WriteOutput(prediction);

        // logging bits have been skipped
      }
    }

This function takes a "ChannelReader<Prediction>" as a parameter. This indicates that this function will only read from the channel. It does not have the ability to write to the channel.

To repeat the code from above, here is where the "Listen" function is called:

C#
    var channel = Channel.CreateUnbounded<Prediction>();

    var listener = Listen(channel.Reader, log);
    var producer = Produce(channel.Writer, rawValidation, classifier);

    await producer;
    await listener;

The call to "Listen" uses the "channel.Reader" property. Rather than passing in the entire channel, we just use the functionality that we need -- the ability to read from the channel.

Specialization: Closing the Channel

Another specialization that Channel<T> has is the ability to close the channel. I should be a bit more specific here: the terminology of "closing the channel" comes from my work with channels in Go (golang). In Go, closing a channel indicates that no new items will be written to a channel (writing to a closed channel results in a "panic" -- equivalent to an unhandled exception). Items can still be read from a closed channel until the channel is empty.

If you're curious about channels in Go, see "Go (golang) Channels - Moving Data Between Concurrent Processes".

In C#, we don't "close the channel", we mark the "Writer" as complete -- meaning, there will be no new items written to the channel. We can still read from the channel until all of the items have been read.

Marking the Writer as Complete
The ChannelWriter<T> class has a "Complete" method. This is included in the "Produce" function that we saw earlier.

C#
    private static async Task Produce(ChannelWriter<Prediction> writer,
    string[] rawData, FSharpFunc<int[], Observation> classifier)
    {
      await Task.Run(() =>
      {
        Parallel.ForEach(rawData, imageString =>
        {
          // some bits skipped here.

          var result = Recognizers.predict(ints, classifier);

          var prediction = new Prediction 
          {
            // some bits skipped here, too.
          };

          writer.WriteAsync(prediction);
        });
      });

      writer.Complete();
    }

The last line of this function calls "Writer.Complete()". This happens outside of the "Parallel.ForEach" loop -- after all of the items have been processed and we are done writing to the channel.

Signaling the Reader
The important aspect of marking the Writer as complete is that it lets the Reader know that there will be no new items.

Let's look at the "Listen" function again:

C#
    private static async Task Listen(ChannelReader<Prediction> reader,
    List<Prediction> log)
    {
      await foreach (Prediction prediction in reader.ReadAllAsync())
      {
        // Display the result
        Console.SetCursorPosition(0, 0);
        WriteOutput(prediction);

        // logging bits have been skipped
      }
    }

The core of this function is a "foreach" loop that iterates using the result from "reader.ReadAllAsync()". As we saw in the previous article, this is an IAsyncEnumerable<T>.

At some point, we need to exit the "foreach" loop. That's why "Complete()" is important. Once the channel writer indicates that there will be no new items, the "foreach" exits after all of the remaining items have been read. (There are more technical bits to how IAsyncEnumerable works, but that's not important to understand this bit of code.)

Optimization: Channel Options

Another difference between Channel<T> and ConcurrentQueue<T> is that Channel<T> has some features to tune the operation.

The "Channel.CreateBounded<T>" and "Channel.CreateUnbounded<T>" functions take an optional parameter ("BoundedChannelOptions" or "UnboundedChannelOptions"). These options give the compiler a chance to optimize the operations.

Here's a link to the Microsoft documentation for "BoundedChannelOptions": BoundedChannelOptions Class. This gives a bit more detail on what is available.

Let's look at a few of these options.

Option: SingleWriter
The boolean SingleWriter property lets us specify whether we have a single process writing to the channel or if we have multiple processes (the default is "false", meaning multiple).

If we have multiple writers, there is more overhead to make sure that the capacity of a bounded channel is not exceeded and that writers using "WaitToWriteAsync" or "TryWriteAsync" are managed appropriately.

If we only have a SingleWriter, then the compiler can optimize the code since it does not need to worry about some of those complexities.

For our example code, we need to allow multiple writers. The ParallelForeach loop (in the "Produce" function) spins up concurrent operations for each item in the loop. The result is that we have multiple processes writing to the channel.

Option: SingleReader
Like SingleWriter, the boolean SingleReader property lets us specify whether we have a single process reading from the channel or if we have multiple processes (the default is "false", meaning multiple).

With multiple readers, there is more overhead when readers are using "WaitToReadAsync" or "TryReadAsync".

If we only have a SingleReader, then the compiler can optimize the code.

For our example code, we only have 1 reader (in the "Listen" function). We could set this option to "true" for our code.

Option: AllowSynchronousContinuations
Another option is "AllowSynchronousContinuations". I won't pretend to understand the specifics behind this option.

Setting this option to true can provide measurable throughput improvements by avoiding scheduling additional work items. However, it may come at the cost of reduced parallelism, as for example a producer may then be the one to execute work associated with a consumer, and if not done thoughtfully, this can lead to unexpected interactions. The default is false.
I'm not exactly sure what scenarios this option is useful for. But it looks like we can get increased throughput at the cost of decreased parallelism.

The main point is that Channel<T> has some tuning that is not available in concurrent collections.

Channel<T> is not an Enumerable

Another thing to be aware of when looking at the differences between Channel<T> and ConcurrentQueue<T> (or other concurrent collections) is that Channel<T> is not an IEnumerable<T>. It also does not implement any other collection interfaces (such as ICollection<T>, IReadOnlyCollection<T>, or IList<T>).

IEnumerable<T> is important for 2 reason.

First, this is important because of LINQ (Language Integrated Query). LINQ provides a set of extension methods on IEnumerable<T> (and IQueryable<T> and some other types) that let us filter, sort, aggregate, group, and transform data in collections.

ConcurrentQueue<T> does implement IEnumerable<T>, so it has all of these extension methods available.

Channel<T> does not implement IEnumerable<T> (the same is true of ChannelReader<T>), so these extension methods are not available.

Second, IEnumerable<T> is important because it allows us to "foreach" through a collection.

ConcurrentQueue<T> does support "foreach".

Channel<T> does not directly support "foreach", but we have seen that ChannelReader<T> has a "ReadAllAsync" method that returns IAsyncEnumerable<T>. So this allows us to use "await foreach" on a channel reader, but it's less direct than using a concurrent collection.

There is no Peek

A functional difference between Channel<T> and ConcurrentQueue<T> is that Channel<T> does not have a "Peek" function. In a queue, "Peek" allows us to look at the next item in the queue without actually removing it from the queue.

In a Channel (more specifically ChannelReader<T>), there is no "Peek" functionality. When we read an item from the channel, it also removes it from the channel.

Which to Use?

So the final question is "Should I use Channel<T> instead of ConcurrentQueue<T>?"

Personally, I like the specialization of Channel<T>, since this is what I generally need -- the ability to produce items in one concurrent operation (or set of operations) and read the output in another concurrent operation.

I like how I can be explicit about the processes that are writing to the channel as opposed to the processes that are reading from the channel. I also like the ability to mark the Writer as "Complete" so that I can stop reading. (This is doable with a ConcurrentQueue, but we would need to write our own signaling code for that.)

If ConcurrentQueue<T> is working for you, there is no reason to rip it out and change it to Channel<T> unless there are specific features (such as the optimizations) that you need. But for new code, I would recommend considering if Channel<T> will work for you.

I'm still exploring Channel<T> and figuring out how I can adjust my concurrent/parallel patterns to take advantage of it. Be sure to check back. I'll have more articles on what I've learned.

Happy Coding!

Monday, February 8, 2021

An Introduction to Channels in C#

Channels give us a way to communicate between concurrent (async) operations in .NET. Channel<T> was included in .NET Core 3.0 (so it's been available for a bit over a year now). I first encountered channels in Go (golang) several years ago, so I was really interested when I found out that they are available to use in C#.
Channels give us a way to communicate between concurrent (async) operations in .NET.
We can think of a channel as a sort of thread-safe queue. This means that we can add items to a channel in one concurrent operation and read those items in another. (We can also have multiple readers and multiple writers.)

As an introduction to channels, we'll look at an example that mirrors some code from the Go programming language (golang). This will give us an idea of the syntax and how channels work. In a future article, we'll look at a more real-world example.

The Go Example: I have been exploring Go for a while and looking at the similarities and differences compared to C#. You can look at the Go example (and accompanying article) here: Go (golang) Channels - Moving Data Between Concurrent Processes. The article also mentions a little bit about C# channels.

What is Channel<T>?

The purpose of a channel is to let us communicate between 2 (or more) concurrent operations. Like a thread-safe queue, we can add an item to the queue, and we can remove an item from the queue. Since channels are thread-safe, we can have multiple processes writing to and reading from the channel without worrying about missed writes or duplicate reads.

Update: I've had several questions regarding the difference between Channel<T> and ConcurrentQueue<T> (or other concurrent collections). The short answer: specialization and optimization. For a longer answer, here's an article: What's the Difference between Channel and ConcurrentQueue in C#?

An Example
Let's imagine that we need to transform 1,000 records and save the resulting data. Due to the nature of the operation, the transformation takes several seconds to complete. In this scenario, we can set up several concurrent processes that transform records. When a transformation is complete, it puts the record onto the channel.

In a separate operation, we have a process that listens to that channel. When an item is available, it takes it off of the channel and saves it to the data store. Since this operation is faster (only taking a few milliseconds), we can have a single operation that is responsible for this.

This is an implementation of the Observer pattern -- at its core, it is a publish-subscribe relationship. The channel takes results from the publisher (the transformation process) and provides them to the subscriber (the saving process).

Channel Operations
There are a number of operations available with Channel<T>. The primary operations that we'll look at today include the following: 
  1. Creating a channel
  2. Writing to a channel
  3. Closing a channel when we are done writing to it
  4. Reading from a channel
Channel<T> gives us quite a few ways of accomplishing each of these functions. What we'll see today will give us a taste of what is available, but there is more to explore.

Parallel Operations

The sample project is available on GitHub: https://github.com/jeremybytes/csharp-channels. This project has a service with several end points. One end point provides a single "Person" record based on an ID. This operation has a built-in 1 second delay (to simulate a bit of lag).

Our code is in a console application that gets data -- the goal is to pull out 9 records from the service. If we do this sequentially, then the operation takes 9 seconds. But if we do this concurrently (i.e., in parallel), the the operation takes a little over 1 second.

The concurrent operations that call the service put their results onto a channel. Then we have a listener that reads values off of the channel and displays them in the console.

Creating a Channel

To create a channel, we use static methods on the "Channel" class. The two methods we can choose from are "CreateBoundedChannel" and "CreateUnboundedChannel". A bounded channel has a constraint on how many items the channel can hold at one time. If a channel is full, a writer can wait until there is space available in the channel.

Whether we have a bounded or an unbounded channel, we must also specify a type (using a generic type parameter). A channel can only hold one type of object.

Here is the code from our sample (line 61 in the Program.cs file in the project mentioned above):

C#
    var channel = Channel.CreateBounded<Person>(10);

The above code creates a bounded channel that can hold up to 10 "Person" objects. Our sample data only has 9 items, so we will not need to worry about the channel reaching capacity for this example.

Writing to a Channel

A channel has 2 primary properties: a Reader and a Writer. For our channel, the Writer property is a ChannelWriter<Person>. A channel writer has several methods; we'll use the "WriteAsync" method.

Here's the "WriteAsync" method call (line 72 in the Program.cs file):

C#
    await channel.Writer.WriteAsync(person);

Like many things in C#, this method is asynchronous. The "await" will pause the operation until it is complete.

Let's look at the "WriteAsync" method in context. Here is the enclosing section (lines 64 to 81 in the Program.cs file):

C#
    // Write person objects to channel asynchronously
    foreach (var id in ids)
    {
      async Task func(int id)
      {
        try
        {
          var person = await GetPersonAsync(id);
          await channel.Writer.WriteAsync(person);
        }
        catch (HttpRequestException ex)
        {
          Console.WriteLine($"ERROR: ID {id}: {ex.Message}");
        }
      }
      asyncCalls.Add(func(id));
    }

The "foreach" loops through a collection of "ids" (the "id" property of the "Person" object).

Inside the "foreach" loop, we have a local function (named "func" -- not a great name, but it makes the code similar to the anonymous function in the Go example).

The "func" function calls "GetPersonAsync". "GetPersonAsync" calls the service and returns the resulting "person".

After getting the "person", we put that object onto the channel using "Writer.WriteAsync".

The catch block is looking for exceptions that might be thrown from the service -- for example if the service is not available or the service returns an empty object.

The last line of the "foreach" loop does a couple of things. It calls the local "func" function using the "id" value from the loop. "func" is asynchronous and returns a Task.

"asyncCalls" is a "List<Task>". By calling "asyncCalls.Add()", we add the Task from the "func" call to the list. We'll see how this is used later.

There are no blocking operations inside the "foreach" loop. Instead, each iteration of the loop kicks off a concurrent (async) operation. This means that the loop will complete very quickly, and we will have 9 operations running concurrently.

Closing a Channel

When we are done writing to a channel, we should close it. This tells anyone reading from the channel that there will be no more values, and they can stop reading. We'll see why this is important when we look at reading from a channel.

One issue that we have to deal with in our code is that we do not want to close the channel until all 9 of the concurrent operations are complete. This is why we have the "asyncCalls" list. It has a list of all of the operations (Tasks), and we can wait until they are complete.

Here's the code for that (lines 83 to 88 in the Program.cs file):

C#
    // Wait for the async tasks to complete & close the channel
    _ = Task.Run(async () =>
        {
          await Task.WhenAll(asyncCalls);
          channel.Writer.Complete();
        });

"await Task.WhenAll(asyncCalls)" will pause the operation of this section of code until all of the concurrent tasks are complete. When the "await" is done, we know that it is safe to close the channel.

The next line -- "channel.Writer.Complete()" -- closes the channel. This indicates that no new items will be written to the channel.

These 2 lines of code are wrapped in a task (using "Task.Run"). The reason for this is that we want the application to continue without blocking at this point.  The "Task.WhenAll" will block, but it will block inside a separate concurrent (asynchronous) operation.

This means that the read operation (in the next section) can start running before all of the concurrent tasks are complete.

If running this section asynchronously is a bit hard to grasp, that's okay. It took me a while to wrap my head around it. For our limited sample (with only 9 records), it does not make any visible difference. But it's a good pattern to get used to for larger, more complex scenarios.

Reading from a Channel

Our last step is to read from a channel -- this uses the "Reader" property from the channel.  For our example, "Reader" is a "ChannelReader<Person>".

There are multiple ways that we can read from the channel. The sample code has an "Option 1" and "Option 2" listed (with one of the options commented out). We'll look at "Option 2" first.

Option: WaitToReadAsync / TryRead
"Option 2" uses a couple of "while" loops to read from the channel. Here is the code (lines 97 to 105 in the Program.cs file):

C#
    // Read person objects from the channel until the channel is closed
    // OPTION 2: Using WaitToReadAsync / TryRead
    while (await channel.Reader.WaitToReadAsync())
    {
      while (channel.Reader.TryRead(out Person person))
      {
        Console.WriteLine($"{person.ID}: {person}");
      }
    }

The first "while" loop makes a call to "WaitToReadAsync" on the channel "Reader" property. When we "await" the "WaitToReadAsync" method, the code will pause until an item is available to read from the channel *or* until the channel is closed.

When an item is available to read, "WaitToReadAsync" returns "true" (so the "while" loop will continue). If the channel is closed, then "WaitToReadAsync" returns "false", and the "while" loop will exit.

So far, this only tells us whether there is an item available to read. We have not done any actual reading. For that, we have the next "while" loop.

The inner "while" loop makes a call to "TryRead" on the channel "Reader" property. As with most "Try" methods, this uses an "out" parameter to hold the value. If the read is successful, then the "person" variable is populated, and the value is written to the console.

"TryRead" will return "false" if there are no items available. If that's the case, then it will exit the inner "while" loop and return to the outer "while" loop to wait for the next available item (or the closing of the channel).

Note: After the channel is closed (meaning the Writer is marked Complete), the Reader will continue to read from the channel until the channel is empty. Once a closed channel is empty, the "WaitToReadAsync" will return false and the loop will exit.

Option: ReadAllAsync
Another option ("Option 1" in the code) is to use "ReadAllAsync". This code is nice because it has fewer steps. But it also uses an IAsyncEnumerable. If you are not familiar with IAsyncEnumerable, then it can be a bit difficult to understand. That's why both options are included.

Let's look at the code that uses "ReadAllAsync" (lines 90 to 95 in the Program.cs file):

C#
    // Read person objects from the channel until the channel is closed
    // OPTION 1: Using ReadAllAsync (IAsyncEnumerable)
    await foreach (Person person in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"{person.ID}: {person}");
    }

"ReadAllAsync" is a method on the channel "Reader" property. This returns an "IAsyncEnumerable<Person>". This combines the power of enumeration ("foreach") with the concurrency of "async".

The "foreach" loop condition looks pretty normal -- we iterate on the result of "ReadAllAsync" and use the "Person" object from that iteration.

What is a little different is that there is an "await" before the "foreach". This means that the "foreach" loop may pause between iterations if it needs to wait for an item to be populated in the channel.

The effect is that the "foreach" will continue to provide values until the channel is closed and empty. Inside the loop, we write the value to the console.

At a high level, this code is a bit easier to understand (since it does not have nested loops and uses a familiar "foreach" loop). But when we look more closely, we see that we "await" the loop, and what returns from "ReadAllAsync" may not be a complete set of data -- our loop may wait for items to be populated in the channel.

I would recommend learning more and getting comfortable with IAsyncEnumerable (which was added with C# 8). It's pretty powerful and, as we've seen here, it can simplify our code. But it only simplifies the code if we understand what it's doing.

Application Output

To run the application, first start the service in the "net-people-service" folder. Navigate to the folder from the command line (I'm using PowerShell here), and type "dotnet run". This starts the service and hosts it at "http://localhost:9874".

Console
    PS C:\Channels\net-people-service> dotnet run
    info: Microsoft.Hosting.Lifetime[0]
        Now listening on: http://localhost:9874
    info: Microsoft.Hosting.Lifetime[0]
        Application started. Press Ctrl+C to shut down.
    info: Microsoft.Hosting.Lifetime[0]
        Hosting environment: Development
    info: Microsoft.Hosting.Lifetime[0]
        Content root path: C:\Channels\net-people-service

To run the application, navigate to the "async" folder (from a new command line) and type "dotnet run".

Console
    PS C:\Channels\async> dotnet run
    [1 2 3 4 5 6 7 8 9]
    9: Isaac Gampu
    3: Turanga Leela
    7: John Sheridan
    4: John Crichton
    6: Laura Roslin
    5: Dave Lister
    2: Dylan Hunt
    1: John Koenig
    8: Dante Montana
    Total Time: 00:00:01.3324302
    PS C:\Channels\async>

There are a couple of things to note about this output. First, notice the "Total Time" at the bottom. This takes a bit over 1 second to complete. As mentioned earlier, each call to the service takes at least 1 second (due to a delay in the service). This shows us that all 9 of these service calls happen concurrently.

Next, notice the order of the output. It is non-deterministic. Even though we have a "foreach" loop that goes through the ids in sequence (1, 2, 3, 4, 5, 6, 7, 8, 9), the output is not in the same order. This is one of the joys of running things in parallel -- the order things complete is not the same as the order things were started.

If we run this application multiple times, the items will be in a different order each time. (Okay, not different *each* time since we are only dealing with 9 values, but we cannot guarantee the order of the results.)

There's Much More

In this article, we've looked at some of the functions of Channel<T>, including creating, writing to, closing, and reading from channels. But Channel<T> has much more functionality.

When we create a channel (using "CreateBounded" or "CreateUnbounded"), we can also include a set of options. These include "SingleReader" and "SingleWriter" to indicate that we will only have one read or write operation at a time. This allows the compiler to optimize the channel. In addition, there is an "AllowSynchronousContinuations" option. This can increase throughput by reducing parallelism.

ChannelWriter<T> has several methods in addition to "WriteAsync". These include "WaitToWriteAsync" and "TryWrite" which we may need in scenarios where we have bounded channels and we exceed the capacity. There is also a "TryComplete" method that tries to close the channel.

ChannelReader<T> has many methods as well. Today, we saw "ReadAllAsync", "WaitToReadAsync", and "TryRead". But there is also a "ReadAsync". We can mix and match these methods in different ways depending on what our needs are.

Wrap Up

Channel<T> is really interesting to me. When I started looking at Go (golang) a few years back, I found channels to be clean and easy to use. I wondered if it would make sense to have something similar in C#. Apparently, I was not the only one. Channel<T> uses many of the same concepts as channels in Go, but with a bit more power and functionality. This adds complexity as well (but I've learned to accept complexity in the C# world).

In today's example, we saw how Channel<T> lets us communicate between concurrent operations. In our case, we have 9 concurrent operations that all write their results to a channel. Then we have a reader that pulls the results from the channel and displays them on the console.

This allowed us to see some basics of creating, writing to, closing, and reading from channels. But the real world can get a bit more complicated.

I have been experimenting with channels in my digit recognition project (https://github.com/jeremybytes/digit-display). This uses machine learning to recognize hand-written digits from bitmap data. These operations are computationally expensive, so I've been using tasks with continuations to run them in parallel. But I've also created versions that uses channels. This simplifies some things (I don't need to be as concerned about getting back to the UI thread in the desktop application). But it creates issues in others (using a Parallel.Foreach can end up starving the main application thread). I've got a bit more experimenting to do. The solution that I have right now works, but it doesn't feel quite right.

As always, there is still more to learn.

Happy Coding!

Tuesday, February 2, 2021

Go (golang) Anonymous Functions - Inlining Code for Goroutines

Anonymous functions in Go (golang) let us inline code rather than having a separate function. These are often used for goroutines (concurrent operations). Similar to anonymous delegates and lambda expressions in C#, anonymous functions in Go also support captured variables (referred to as closures). This can simplify our function signatures. Today, we'll look at how these fit into our code.
Anonymous functions allow us to inline code by creating a function with no name. Combined with closures, this can simplify function signatures for goroutines.
We'll continue with the sample that we've seen in the previous articles ("Go (golang) Channels - Moving Data Between Concurrent Processes" and "Go (golang) WaitGroup - Signal that a Concurrent Operation is Complete") to see anonymous functions with concurrent code.

Note: I would highly recommend reading the previous articles before continuing here. This article builds on the concurrency concepts of channels and WaitGroup.

Motivation: I have been using C# as a primary language for 15 years. Exploring other languages gives us insight into different ways of handling programming tasks. Even though Go (golang) is not my production language, I've found several features and conventions to be interesting. By looking at different paradigms, we can grow as programmers and become more effective in our primary language.

Resources: For links to a video walkthrough, CodeTour, and additional articles, see A Tour of Go for the C# Developer.

Anonymous Function Syntax

Let's start by looking at an example of an anonymous function.

Go
    go func(message string) {
      fmt.Println(message)
    }("hello")

This starts a new goroutine (i.e., kicks off a concurrent operation).

The anonymous function starts with "func" and then parameters in parentheses. This function takes a single string parameter named "message". If there are no parameters, use a set of empty parentheses. The main difference between this and a normal function declaration is that there is no name for the function.

The body of the function (enclosed in braces) is just like a normal function declaration.

After the closing brace, we include the parameter values enclosed in parentheses. This is because we are not just declaring the anonymous function, we are also calling it. In this example, the string "hello" is used for the "message" parameter.

Important: If an anonymous function has *no* parameters, you must include a set of empty parentheses after the closing brace to indicate that you are calling the function. (I keep forgetting this, and the compiler error does not make it obvious to me what is wrong.)

Using Anonymous Functions

Let's take the example we used when discussing channels and WaitGroup and add anonymous functions. As a reminder, here's how we left the "fetchPersonToChanel" function:

Go
    func fetchPersonToChannel(id int, ch chan<- person, wg *sync.WaitGroup) {
      defer wg.Done()
      p, err := getPerson(id)
      if err != nil {
        log.Printf("err calling getPerson(%d): %v", id, err)
        return
      }
      ch <- p
    }

And the code that calls the function concurrently:

Go
    ch := make(chan person, 10)
    var wg sync.WaitGroup

    // put values onto a channel
    for _, id := range ids {
      wg.Add(1)
      go fetchPersonToChannel(id, ch, &wg)
    }

    wg.Wait()
    close(ch)

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

For our code, we want to take the "fetchPersonToChannel" function and inline it. This will put the body of the function right after the "go" in the first "for" loop.

Inlining the Code
Here's what the code looks like once we add the anonymous function:

Go
    ch := make(chan person, 10)
    var wg sync.WaitGroup

    // put values onto a channel
    for _, id := range ids {
      wg.Add(1)
      go func(id int, ch chan<- person, wg *sync.WaitGroup) {
        defer wg.Done()
        p, err := getPerson(id)
        if err != nil {
          log.Printf("err calling getPerson(%d): %v", id, err)
          return
        }
        ch <- p
      }(id, ch, &wg)
    }

    wg.Wait()
    close(ch)

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

This block of code has the same functionality as the code with the separate function (more or less - there are a few technical differences).

Let's look at the pieces of the anonymous function a bit more closely. Here is just that part of the code:

Go
    go func(id int, ch chan<- person, wg *sync.WaitGroup) {
      defer wg.Done()
      p, err := getPerson(id)
      if err != nil {
        log.Printf("err calling getPerson(%d): %v", id, err)
        return
      }
      ch <- p
    }(id, ch, &wg)

After the "func" keyword, we have the parameters. These are the same parameters as the separate named function (we'll see how we can simplify this in just a bit).

After the body of the function (the last line), we have a set of parentheses with the parameter values for the function. In this case, we get the "id" value from the for loop, and the "ch" and "wg" values from the variables created earlier.

Things look a little strange because the names of the parameters (id, ch, wg) in the function declaration match the names of the variables (id, ch, wg) that are passed into the function at the bottom.

Simplifying the Function with Closures

In C#, anonymous delegates and lambda expressions can capture variables. This means that they can use a variable that is in the scope of the declaration even if the variable is not explicitly passed in to the anonymous delegate / lambda expression. In C#, these are referred to as "captured variables"; in Go (and most other languages), these are referred to as "closures".

For our example, since the channel ("ch") and WaitGroup ("wg") are both part of the enclosing scope, we can use these in the anonymous function without passing them in as parameters.

Here's what the code looks like when we do this:

Go
    ch := make(chan person, 10)
    var wg sync.WaitGroup

    // put values onto a channel
    for _, id := range ids {
      wg.Add(1)
      go func(id int) {
        defer wg.Done()
        p, err := getPerson(id)
        if err != nil {
          log.Printf("err calling getPerson(%d): %v", id, err)
          return
        }
        ch <- p
      }(id)
    }

    wg.Wait()
    close(ch)

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

The anonymous function now only takes one parameter: the "id". The channel and WaitGroup are no longer passed as parameters.

The body of the function has not changed. Inside the function we still reference "ch" and "wg", but instead of referring to parameters, these refer to the variables that are created before the "for" loop.

What I like about this
I like that when we use an anonymous function, we can simplify the function signature. The channel and WaitGroup parameters strike me as "infrastructure" parameters -- things we need to get the concurrent code to work properly.

I also like that we no longer have to worry about pointers. Previously, we had to use a pointer to a WaitGroup as a parameter so that things would work. Since the WaitGroup is now a closure, we don't have any pointers that we deal with directly.

Don't Capture Indexers

In C#, there is a danger using indexers in a closure (see "Lambda Expressions, Captured Variables, and For Loops: A Dangerous Combination"). This same danger exists in Go.

In the code above, we keep "id" as a parameter. It is tempting to use a closure for this as well. But if we do this, we end up with unexpected behavior.

Let's see what happens when we capture the "id" value. Here's that code:

Go (DANGER DANGER DANGER - do not copy this code block)
    ch := make(chan person, 10)
    var wg sync.WaitGroup

    // put values onto a channel
    for _, id := range ids {
      wg.Add(1)
      go func() {
        defer wg.Done()
        p, err := getPerson(id)
        if err != nil {
          log.Printf("err calling getPerson(%d): %v", id, err)
          return
        }
        ch <- p
      }()
    }

    wg.Wait()
    close(ch)

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

Now the anonymous function takes no parameters. Here is the output:

Console
    PS C:\GoDemo\async> .\async.exe
    [1 2 3 4 5 6 7 8 9]
    9: Isaac Gampu
    9: Isaac Gampu
    9: Isaac Gampu
    9: Isaac Gampu
    9: Isaac Gampu
    9: Isaac Gampu
    9: Isaac Gampu
    9: Isaac Gampu
    4: John Crichton

Instead of getting 9 separate values, we find that the "id" value of "9" repeated over and over. This is because of the way that closures work.
The value of a variable in a closure is the value at the time the variable is used, not the value at the time it was captured.
This means that if we capture an indexer or an iterator, we often get the final value of that indexer. For this particular example, the last value is "9", so that is what we see. There is one value of "4" -- we get this because of the "fun" of concurrency. The first call to the anonymous function ran before the iteration was complete, so the "id" has an intermediate value. The rest of the calls to the anonymous function ran after the iteration was done, so "id" has the final value.

The bad news is that we do not get a compiler warning if we try to capture an indexer or iterator value. The good news is that a good linting tool (such as the tool included with the "Go" extension for Visual Studio Code) will warn us when we make this error.

Another Anonymous Function

There is one more anonymous function that we can add to our block of code. We can wrap the code that waits for the WaitGroup and closes the channel.

Here's that code (note that the "id" parameter is back in the first anonymous function):

Go
    ch := make(chan person, 10)
    var wg sync.WaitGroup

    // put values onto a channel
    for _, id := range ids {
      wg.Add(1)
      go func(id int) {
        defer wg.Done()
        p, err := getPerson(id)
        if err != nil {
          log.Printf("err calling getPerson(%d): %v", id, err)
          return
        }
        ch <- p
      }(id)
    }

    go func() {
      wg.Wait()
      close(ch)
    }()

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

Notice the code between the "for" loops. We have a goroutine with an anonymous function that takes no parameters. This will kick off this code concurrently.

How does this change the flow of the application?
The change to how the application works is subtle, but it's interesting. (Note: if it doesn't make sense right away, that's fine. It took me a little while to wrap my head around it.)

With this goroutine in place, we end up with 11 total goroutines. This includes the 9 goroutines created in the first "for" loop, the goroutine with the "Wait" and "close", and the "main" function (the "main" function is also a goroutine).

In our previous code, the "main" function blocked at the "wg.Wait" call. With the new code, the "main" function is not blocked here (since it is inside a goroutine). Instead, the "main" function will block with the second "for" loop waiting for the channel to be closed.

The goroutine with the "Wait" will block until the WaitGroup hits 0. But this blocks only within the goroutine itself. Once the WaitGroup hits 0, then the channel will be closed.

Once the channel is closed, the second "for" loop in the "main" function will complete, and the application continues as normal.

From the outside, the behavior is the same. We still have the separate concurrent operations that call a web service and write to a channel. We still have the WaitGroup that signals when these concurrent operations are complete (so the channel can be closed). And we still have the "for" loop that reads from the channel until that channel is closed.

Internally, we have a bit more concurrency. The WaitGroup "Wait" is happening in a separate goroutine.

Anonymous functions make this easier
We may or may not want to add another layer of concurrency to the application. But using an anonymous function with a goroutine makes it a lot easier. If we wanted to use a normal named function, we would need to include parameters for the channel and WaitGroup, and it's probably a bit more effort than we want to go through.

But when we can use an anonymous function with a closure for the channel and WaitGroup, there is less code for us to manage.

A Final Example

When we looked at "WaitGroup", we saw an example where we could use a WaitGroup to stop an application from exiting too early. Let's see what happens when we use an anonymous function here.

Here's the original code:

Go
    func logMessages(count int, wg *sync.WaitGroup) {
      defer wg.Done()
      for i := 0; i < count; i++ {
        log.Printf("Logging item #%d\n", i)
        time.Sleep(1 * time.Second)
      }
    }

    func main() {
      var wg WaitGroup
      wg.Add(1)
      go logMessages(10, &wg)
      time.Sleep(3 * time.Second)
      wg.Wait()
      fmt.Println("Done")
    }

And here's the same code with "logMessages" as an anonymous function:

Go
    func main() {
      var wg WaitGroup
      wg.Add(1)
      func() {
        defer wg.Done()
        for i := 0; i < 10; i++ {
          log.Printf("Logging item #%d\n", i)
          time.Sleep(1 * time.Second)
        }
      }()
      time.Sleep(3 * time.Second)
      wg.Wait()
      fmt.Println("Done")
    }

Like our other example, we can remove the "WaitGroup" parameter from the anonymous function and rely on a closure for the WaitGroup.

For the "count" parameter, it doesn't really make much sense to have a parameter for this since we are passing in a hard-coded value ("10"). So, I removed the parameter and put the "10" into the conditional of the "for" loop.

The output of this code is the same as we saw in the prior article.

Wrap Up

Anonymous functions and goroutines work well together. We can inline our code and simplify function signatures by using closures. I have found anonymous delegates and lambda expressions to be very useful in the C# world, and anonymous functions in Go have a lot of similarities.

There is more to learn about anonymous functions. For example, we can assign an anonymous function to a variable (similar to using a delegate variable in C#). In this article, we focused on using anonymous functions with goroutines. But there is more to explore.

Happy Coding!

Monday, February 1, 2021

Go (golang) WaitGroup - Signal that a Concurrent Operation is Complete

 In the last article, we looked at using channels in Go to get data from one concurrent operation to another (Go (golang) Channels - Moving Data Between Concurrent Processes). But we also ran across a problem: How do we know when a concurrent operation is complete? One answer is that we can use a WaitGroup.
A WaitGroup is a counter that we can add and subtract from. WaitGroup.Wait() is a blocking operation that will wait until the counter reaches zero.
By using a WaitGroup, we can signal that a concurrent operation is complete so that we can continue processing.

Concurrency in Go is a bit different than concurrency in C#. Today, we'll look at why we need a WaitGroup in Go and how we can use it in our applications.

Motivation: I have been using C# as a primary language for 15 years. Exploring other languages gives us insight into different ways of handling programming tasks. Even though Go (golang) is not my production language, I've found several features and conventions to be interesting. By looking at different paradigms, we can grow as programmers and become more effective in our primary language.

Resources: For links to a video walkthrough, CodeTour, and additional articles, see A Tour of Go for the C# Developer.

The Problem

In the last article, we saw a potential problem when it came to using channels and concurrent operations. Let's look at that example.

We have a function that gets data from a service (a "person") and puts it onto a channel.

Go
    func fetchPersonToChannel(id int, ch chan<- person) {
      p, err := getPerson(id)
      if err != nil {
        log.Printf("err calling getPerson(%d): %v", id, err)
        return
      }
      ch <- p
    }

The last line of this function puts the person ("p") onto the channel ("ch").

For more information on writing to a channel, see the previous article.

In the main function of the application, we have a "for" loop that calls this function multiple times, and we have a "for" loop that reads from the channel and displays the data.

Go
    ch := make(chan person, 10)

    // put values onto a channel
    for _, id := range ids {
      go fetchPersonToChannel(id, ch)
    }

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

The first "for" loop iterates through the "ids" collection can calls the "fetchPersonToChannel" function. Notice that this is a goroutine (using the "go" keyword), which means that the loop continues without waiting for each call to "fetchPersonToChannel" to complete. This creates multiple concurrent operations.

The second "for" loop reads values from the channel until the channel is closed. It takes the value from the channel and outputs it to the console.

For more information on reading from a channel, see the previous article.

But there's a problem...
The channel is not closed anywhere in the code. As a reminder, when we read from an empty, open channel, the operation blocks until there is an item to read.

Since the channel is never closed, the second "for" loop will block indefinitely.

Can we just close the channel?
Since the channel needs to be closed, can we just close it? Let's consider the following code:

Go
    ch := make(chan person, 10)

    // put values onto a channel
    for _, id := range ids {
      go fetchPersonToChannel(id, ch)
    }

    close(ch)

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

This is the same as above except we have a "close(ch)" after the first "for" loop. Unfortunately, this code will not work as expected.

Since the goroutines (i.e., the concurrent calls to "fetchPersonToChannel") take a little bit of time to complete, the channel will be closed before the goroutines finish. The result is that the "fetchPersonToChannel" function will try to write to a closed channel. And this will cause a panic (similar to an unhandled runtime exception in C#).

We need to wait for the goroutines to finish before we close the channel. And that's where "WaitGroup" comes in.

WaitGroup

"WaitGroup" is a type in the "sync" package. It has limited functionality (just 3 methods), but it will help us solve our issue.

WaitGroup Members
  • Add(int) -- Increments the counter based on the parameter (generally "1").
  • Done() -- Decrements the counter by one.
  • Wait() -- Blocks until the counter is 0.
Let's put these into action.

WaitGroup.Add() & WaitGroup.Wait()

Here's an update to the block of code with the 2 "for" loops:

Go
    ch := make(chan person, 10)
    var wg sync.WaitGroup

    // put values onto a channel
    for _, id := range ids {
      wg.Add(1)
      go fetchPersonToChannel(id, ch, &wg)
    }

    wg.Wait()
    close(ch)

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

The second line declares a "WaitGroup" variable named "wg".

Inside the first "for" loop, we use "Add(1)" to increment the counter. Since the "ids" array has 9 values, the counter will quickly increment to "9".

Also inside the first "for" loop, we have changed the signature of the "fetchPersonToChannel" function. It now takes an additional parameter -- a pointer to a Wait Group (we'll see the updated function in just a bit). We need to pass a pointer to the WaitGroup so that it can be updated from within the function. (Pointers is a bigger topic that we won't go into today.)

After the first "for" loop, we call "Wait()" on our WaitGroup variable. This will block until the WaitGroup counter reaches zero. (We'll see how the counter gets decremented in just a bit.)

After the WaitGroup reaches zero, we close the channel. This ensures that the channel is not closed until after all of the goroutines are done (and we are done writing to the channel).

WaitGroup.Done()

We decrement the counter inside the "fetchPersonToChannel" function. Here's the updated function:

Go
    func fetchPersonToChannel(id int, ch chan<- person, wg *sync.WaitGroup) {
      defer wg.Done()
      p, err := getPerson(id)
      if err != nil {
        log.Printf("err calling getPerson(%d): %v", id, err)
        return
      }
      ch <- p
    }

We added a third parameter to the function signature: a pointer to a WaitGroup called "wg".

The first line of the function is "defer wg.Done()". The "Done" function will decrement the counter.

More importantly is the "defer". As we saw in a prior article (Go (golang) defer - A Better “finally”), "defer" is similar to a "finally" in C# -- it will run this line of code before the function exits.

This function can exit in 2 ways: (1) if there is an error, the error is logged and the function exits; (2) if there is no error, the data is put onto the channel and the function exits. Either way, the "defer" will run and "Done" will be called on the WaitGroup.

Program Flow

Here's the over all flow:
  1. When the first "for" loop runs, the counter is incremented (with "wg.Add") before each goroutine is started. In this case, it quickly increases the counter to 9.
  2. The code hits the "wg.Wait()" call and pauses.
  3. Inside each goroutine, the counter is decremented (with "wg.Done"). The counter decreases until it reaches zero.
  4. When the counter reaches zero, "wg.Wait()" stops waiting and the channel is closed.
  5. The second "for" loop reads items from the channel.
  6. Since the channel is closed, the for loop will exit once all of the values have been read from the channel.

Exiting Too Early

So far, our problem has been a blocking operation that hangs the application. But with concurrent operations, we may have the opposite problem: our application may exit before a concurrent operation has finished. Let's consider an example:

Go
    func logMessages(count int) {
      for i := 0; i < count; i++ {
        log.Printf("Logging item #%d\n", i)
        time.Sleep(1 * time.Second)
      }
    }

    func main() {
      go logMessages(10)
      time.Sleep(3 * time.Second)
      fmt.Println("Done")
    }

In this code, we have a "logMessages" function that logs a number of messages. By default, these messages go to the console.

Notice the "time.Sleep()" function call. This pauses for 1 second between each log message. So, if we call the "logMessages" function with a parameter of 10, this function will take (at least) 10 seconds to complete.

In the "main" function, we call "logMessages" concurrently (using "go"). Then, the function sleeps for 3 seconds, and finally, it prints "Done" to the console.

After this, the application exits. Here is a sample output:

Console:
    PS C:\GoDemo\waiting> .\waiting.exe
    2021/02/02 07:25:37 Logging item #0
    2021/02/02 07:25:38 Logging item #1
    2021/02/02 07:25:39 Logging item #2
    Done
    2021/02/02 07:25:40 Logging item #3
    PS C:\GoDemo\waiting>

In this run, the logging function has a chance to output 4 messages before the application exits. (It's interesting that the "Done" is printed before the last message, but that's just part of the "fun" when dealing with concurrency.)

Keeping the Application Alive
We can use a WaitGroup to keep the application alive until the concurrent operation completes. Here's the same application with a WaitGroup added:

Go
    func logMessages(count int, wg *sync.WaitGroup) {
      defer wg.Done()
      for i := 0; i < count; i++ {
        log.Printf("Logging item #%d\n", i)
        time.Sleep(1 * time.Second)
      }
    }

    func main() {
      var wg WaitGroup
      wg.Add(1)
      go logMessages(10, &wg)
      time.Sleep(3 * time.Second)
      wg.Wait()
      fmt.Println("Done")
    }

The "logMessages" now takes a pointer to a WaitGroup as a parameter. We also have a new line in the function (the first line) -- "defer wg.Done()". This calls "Done" once the function is complete.

In the "main" function, we added a WaitGroup variable (called "wg") and immediately call "Add(1)". Then we call "logMessages" concurrently (passing in the WaitGroup as a parameter).

After sleeping for 3 seconds, we call "Wait" on the WaitGroup. This blocks   the application until the "logMessages" function is complete.

Finally, we print "Done" and exit.

In the output, we can see that this application waits for the concurrent operation to complete before exiting:

Console:
    PS C:\GoDemo\waiting> .\waiting.exe
    2021/02/02 07:31:27 Logging item #0
    2021/02/02 07:31:28 Logging item #1
    2021/02/02 07:31:29 Logging item #2
    2021/02/02 07:31:30 Logging item #3
    2021/02/02 07:31:31 Logging item #4
    2021/02/02 07:31:32 Logging item #5
    2021/02/02 07:31:33 Logging item #6
    2021/02/02 07:31:34 Logging item #7
    2021/02/02 07:31:35 Logging item #8
    2021/02/02 07:31:36 Logging item #9
    Done
    PS C:\GoDemo\waiting>

With the WaitGroup in place, we can keep the application running until all of the goroutines have a chance to finish.

Different from C#

This process is quite a bit different from C#, and that mainly has to do with the way concurrent operations are coded. In C#, the concurrent operations can be represented with a Task or a set of Tasks. Then we either "await" the Task or use "Task.WaitAll" to pause until all of the Tasks have completed.

But in Go, we do not have Tasks. Instead we have goroutines, and a goroutine does not have an external way to tell that it has completed — there is no way to “await” a goroutine. So we need to use something like a WaitGroup.

These different approaches have pros and cons. Tasks in C# are very flexible and powerful, and they have a lot of features. But they are also complex and take time to fully understand.

Goroutines in Go do not have many features -- mostly, we can kick off a concurrent operation. The limit to features is nice because that's all we have to learn: just put a "go" in front of a function and it runs concurrently. But since we cannot know when a goroutine has completed, we need to resort to external items like a WaitGroup to know when things are done.

Anonymous Functions

This code is not complete yet. In Go, it is very common to use anonymous functions for goroutines. An anonymous function is a way to inline function code.

Like lambda expressions and anonymous delegates in C#, anonymous functions in Go can capture variables that are in scope (known as "closures" in Go (and languages other than C# (where we call them "captured variables”))).

Anonymous functions simplify some things in the sample we've looked at. By using captured variables, we can remove some parameters from the function signature, and we can also do away with the need to reference pointers.

I won't show what that code looks like here. We'll look at anonymous functions in the next article.


Wrap Up

"WaitGroup" in Go is very useful. Since we cannot directly know when a goroutine has completed, we can use a WaitGroup (with "Add()" and "Done()") to keep track. We can use the WaitGroup to pause our application using "Wait()". Once all of the concurrent operations are complete, the WaitGroup will stop blocking, and the rest of our code will run.

As I've mentioned before, I find it really interesting to explore different approaches to programming problems. In this case, Go and C# have fairly different approaches to concurrent programming. But we can learn from both languages and expand the way that we think about our solutions.

Happy Coding!

Sunday, January 31, 2021

Go (golang) Channels - Moving Data Between Concurrent Processes

Go has concurrency baked in to the language -- concurrent operations are referred to as "goroutines". But concurrency is a little more complicated than running multiple operations concurrently. We often need to move data from one operation to another. For example, we may have an operation that produces data and another operation that displays or processes that data. One way of doing this in Go is by using a channel.
A channel is a queue that allows us to send data between concurrent operations.
Let's take a look at channels in Go to see how we can use them to communicate between concurrent operations. And we'll do a quick comparison to Channel<T> in C#.

Motivation: I have been using C# as a primary language for 15 years. Exploring other languages gives us insight into different ways of handling programming tasks. Even though Go (golang) is not my production language, I've found several features and conventions to be interesting. By looking at different paradigms, we can grow as programmers and become more effective in our primary language.

Resources: For links to a video walkthrough, CodeTour, and additional articles, see A Tour of Go for the C# Developer.

Go Channels

A channel in Go only has a few operations. We can put an item onto a channel, and we can take an item off of a channel -- similar to "enqueue" and "dequeue" on a Queue in C#.

If a channel is full (meaning, it has reached capacity), then trying to add an item to the channel will block until there is space available. If a channel is empty, then trying to take an item off of the channel will block until there is an item to read. We'll see how this affects things in just a bit.

Before using a channel, we create it (using "make") and decide the type that is holds and how many items it can hold. After we're done putting things onto the channel, we can close it. This will indicate to anyone pulling items off of the channel that there will be no more items.

Let's look at each of these features more closely.

Creating a Channel

To create a channel, we use the built-in "make" function. Here's an example of creating a channel that holds an integer:

Go
    ch := make(chan int)

By default, a channel has a capacity of "1", meaning it can only hold one item at a time. If we try to add an item to a channel that is at capacity, that operation will block until there is space available. (Space would be made available when an operation takes an item off of the channel.)

We can also include a capacity to the "make" function. The following code creates a channel that can hold 10 integers:

Go
    ch := make(chan int, 10)

Managing the capacity of a channel is important. When I was first starting, I left the capacity at the default of 1. This lead to blocking operations that were difficult to debug. Make sure that the channel is big enough to keep things flowing.

Putting an Item onto a Channel

To put an item onto a channel, we use the "<-" operator with the arrow pointing toward the channel variable. Here's how we can add the integer "3" to the channel "ch":

Go
    ch <- 3

Indicating how a channel will be used
One interesting thing about channels is that we can indicate how they will be used in a function. Here's a function declaration:

Go
    func calculateNextPrime(lastPrime int, ch chan<- int) {
      nextPrime := getNextPrime(lastPrime)
      ch <- nextPrime
    }

In the function parameter, we have a channel called "ch" that holds integers ("ch chan<- int"). But notice the arrow that is in the declaration. The arrow pointing toward "chan" indicates that this function will only put items onto a channel; it will not take items off of the channel.

Indicating a direction is not required; we can use a bi-directional channel as a parameter for a function. But by indicating a direction, this gives us some safety. If we try to take an item off of the channel in this function, we will get a compiler error.

Closing a Channel

When we are done writing to a channel, it's best if we close it. Here's how to close a channel (named "ch"):

Go
    close(ch)

Closing a channel does a couple of things for us. First, if we try to write to a channel that has been closed, we get a "panic". This is an illegal operation that will cause our application to exit with an error.

More interesting is what happens when we read from a closed channel. If the channel still contains items, then we can continue to take items off of the channel. But once the channel is empty, if we try to take an item, it will not block. We'll see a bit more of this in the next section.

Taking an Item off of a Channel

There are a couple of different ways to take an item off of a channel.

Note: this is often referred to as "reading" an item off of a channel, but it is a "read and remove" operation. There is no way to "peek" at an item on a channel (meaning, look at the item without removing it). In addition, there is no way to see how many items are currently on a channel.

To read an item off of a channel, use the "<-" operator with the arrow pointed away from the channel.

Go
    var prime int
    prime = <-ch

This code reads an integer off of the channel and assigns it to the "prime" variable.

We can also combine the creation and assignment of a variable with the ":=" operator:

Go
    prime := <-ch

As mentioned above, if the channel is empty, these operations will block until an item is available. If an item is never added to the channel (and the channel remains open), the result is an operation that will "hang".

Reading from a closed channel
Also as mentioned above, empty channels behave a bit differently when the channel is closed. If we assume the channel "ch" is empty and closed, what happens with the following operation?

Go
    prime := <-ch

This operation does not block since the channel is closed. The variable "prime" will have the default value for an integer (which is 0). If we have a struct or another type, this may be an empty struct or a nil. This is not the best result. For safety, we would need to check to see if the value is valid before using it. But there is another option.

If we expect that a channel may be closed, we can use the following construct: 

Go
    prime, ok := <-ch
    if !ok {
      // channel is closed and "prime" is not valid
    }

Reading from a channel returns 2 values, the item on the channel and a Boolean (true/false) value to indicate if the read was successful. If the read is successful (such a reading a value), then the second value is "true". If the read is unsuccessful (such as reading from an empty, closed channel), then the value is "false".

Reading until a channel is closed
One common scenario is wanting to read all of the available values from a channel until the channel is closed. This can be done with a "for" loop, as in the following:

Go
    for {
      prime, ok := <-ch
      if !ok {
        break
      }
      fmt.Println(prime)
    }

This uses an infinite "for" loop (i.e., a "for" loop without a condition) -- for more information on loops, see the earlier article: Go (golang) Loops - A Unified "for". If the channel is closed, then "ok" will be false. The "break" exits out of the loop.

But we can also use a "for" with a "range" to read from a channel.

Go
    for prime := range ch {
      fmt.Println(prime)
    }

This uses a "range" on the channel and assigns the value to "prime". When the channel is closed, the "for" loop exits. I find this version a bit easier to read.

Relating Channels to C#

I haven't talked too much about C# at this point. When I first started looking at Go several years ago, I thought channels were pretty interesting, but there wasn't anything similar in C#. That changed with .NET Core 3.0.

C# now has a Channel<T> class. I won't go into detail about it here. I've been working on an example using my Digit Recognition application (https://github.com/jeremybytes/digit-display), and this will be a separate article in the near future.

But I will note a few examples from the project (specifically, from the "digit-console-channel/Program.cs" file).

C#
    var channel = Channel.CreateUnbounded<Prediction>();

The above code creates a channel that holds "Prediction" objects -- like Go, a channel can only hold one type of item. The "CreateUnbounded" means that it does not have a pre-defined capacity. We can also create a bounded channel that has a fixed capacity.

One difference is that C# channels have explicit "Reader" and "Writer" properties. Here's a function declaration that uses a "ChannelReader":

C#
    private static async Task Listen(
      ChannelReader<Prediction> reader, 
List<Prediction> log,
      bool mini = false)

This function indicates that it will only read from the channel. To call this function, we can use the "Reader" property on a channel variable:

C#
    var channel = Channel.CreateUnbounded<Prediction>();
    var listener = Listen(channel.Reader, log, mini);

Using a channel writer is similar to this.

In addition, we should close the channel when we're done writing to it. This is done with the "Complete" function on the "Writer".

C#
    private static async Task Produce(
      ChannelWriter<Prediction> writer, 
string[] rawData,)
    {
      var prediction = getPrediction(rawData);
      writer.WriteAsync(prediction);

      writer.Complete();
    }

The above is a simplified version of a function that uses a channel writer. After it is done writing (using "WriteAsync" (yes, lots of stuff is async here)), we can call the "Complete" function to close the channel.

As with Go, we can also read from a channel using a looping operation.

C#
    private static async Task Listen(
      ChannelReader<Prediction> reader, 
List<Prediction> log,
      bool mini = false)
    {
      await foreach (Prediction prediction in reader.ReadAllAsync())
      {
        // Display the result
        WriteOutput(prediction, mini);
      }
    }

This is also simplified. But we can see a "foreach" on the channel "Reader". As with the "Writer", a lot of things are async (which we won't go into here). The "ReadAllAsync" function returns an "IAsyncEnumerable" that we can "await foreach". It's a bit confusing until you get used to it.

This "await foreach" will wait for a new item to be available in the channel. It may need to pause until an item is available. Once the channel writer is "Complete" (i.e., closed), then the foreach loop will exit.

The C# version of channels is a bit more involved than the Go version (particularly with the async parts in C#), but the same concepts are there: creating a channel, writing to it, reading from it, and closing it. I've been spending a bit of time exploring C# channels, and I'm looking forward to writing about them in more detail.

Update: If you're interested in learning more about C# Channels, here's an article: Introduction to Channels in C#.

When to Close a Channel

One problem that may come up in our code is that we may not have a good place to close a channel. For example, consider the following Go code:

Go
    func fetchPersonToChannel(id int, ch chan<- person) {
      p, err := getPerson(id)
      if err != nil {
        log.Printf("err calling getPerson(%d): %v", id, err)
        return
      }
      ch <- p
    }

This function makes a service call (with "getPerson") and writes the results to a channel. This function is called multiple times in a "for" loop:

Go
    for _, id := range ids {
      go fetchPersonToChannel(id, ch)
    }

This loops through a collection and calls the "fetchPersonToChannel" function concurrently (notice the "go").

So when is it safe to close the channel?

If we close the channel immediately after the "for" loop, we will run into problems. The concurrent operations are not complete at that point, so we would end up closing the channel before we are done with it. This will result is a "panic" when those operations try to write to the closed channel.

One answer to this problem is to use a WaitGroup. This is a sort of reference counter where we can keep track of running operations.

And we'll explore WaitGroup in the next article.


Wrap Up

Channels in Go are really interesting. Since a channel is fairly limited in what we can do with it, there's not a whole lot that we need to learn to be proficient.

The biggest stumbling block that I've come across when using channels is inadvertently blocking operations. There have been times when a channel capacity is not big enough, and I have blocked my code trying to add to the channel. And there have been times where I have not closed a channel, and I have blocked my code trying to read from a channel.

But once I got the hang of if, things started to go much more smoothly. And I've taken what I've learned from channels in Go and applied much of it to using Channel<T> in C#. They are not exactly equivalent because Go and C# handle concurrent code quite a bit differently, but there are definitely ideas we can share. We'll explore channels in C# in an upcoming article.

Until then, keep exploring.

Happy Coding!