Sunday, January 31, 2021

Go (golang) Channels - Moving Data Between Concurrent Processes

Go has concurrency baked in to the language -- concurrent operations are referred to as "goroutines". But concurrency is a little more complicated than running multiple operations concurrently. We often need to move data from one operation to another. For example, we may have an operation that produces data and another operation that displays or processes that data. One way of doing this in Go is by using a channel.
A channel is a queue that allows us to send data between concurrent operations.
Let's take a look at channels in Go to see how we can use them to communicate between concurrent operations. And we'll do a quick comparison to Channel<T> in C#.

Motivation: I have been using C# as a primary language for 15 years. Exploring other languages gives us insight into different ways of handling programming tasks. Even though Go (golang) is not my production language, I've found several features and conventions to be interesting. By looking at different paradigms, we can grow as programmers and become more effective in our primary language.

Resources: For links to a video walkthrough, CodeTour, and additional articles, see A Tour of Go for the C# Developer.

Go Channels

A channel in Go only has a few operations. We can put an item onto a channel, and we can take an item off of a channel -- similar to "enqueue" and "dequeue" on a Queue in C#.

If a channel is full (meaning, it has reached capacity), then trying to add an item to the channel will block until there is space available. If a channel is empty, then trying to take an item off of the channel will block until there is an item to read. We'll see how this affects things in just a bit.

Before using a channel, we create it (using "make") and decide the type that is holds and how many items it can hold. After we're done putting things onto the channel, we can close it. This will indicate to anyone pulling items off of the channel that there will be no more items.

Let's look at each of these features more closely.

Creating a Channel

To create a channel, we use the built-in "make" function. Here's an example of creating a channel that holds an integer:

Go
    ch := make(chan int)

By default, a channel has a capacity of "1", meaning it can only hold one item at a time. If we try to add an item to a channel that is at capacity, that operation will block until there is space available. (Space would be made available when an operation takes an item off of the channel.)

We can also include a capacity to the "make" function. The following code creates a channel that can hold 10 integers:

Go
    ch := make(chan int, 10)

Managing the capacity of a channel is important. When I was first starting, I left the capacity at the default of 1. This lead to blocking operations that were difficult to debug. Make sure that the channel is big enough to keep things flowing.

Putting an Item onto a Channel

To put an item onto a channel, we use the "<-" operator with the arrow pointing toward the channel variable. Here's how we can add the integer "3" to the channel "ch":

Go
    ch <- 3

Indicating how a channel will be used
One interesting thing about channels is that we can indicate how they will be used in a function. Here's a function declaration:

Go
    func calculateNextPrime(lastPrime int, ch chan<- int) {
      nextPrime := getNextPrime(lastPrime)
      ch <- nextPrime
    }

In the function parameter, we have a channel called "ch" that holds integers ("ch chan<- int"). But notice the arrow that is in the declaration. The arrow pointing toward "chan" indicates that this function will only put items onto a channel; it will not take items off of the channel.

Indicating a direction is not required; we can use a bi-directional channel as a parameter for a function. But by indicating a direction, this gives us some safety. If we try to take an item off of the channel in this function, we will get a compiler error.

Closing a Channel

When we are done writing to a channel, it's best if we close it. Here's how to close a channel (named "ch"):

Go
    close(ch)

Closing a channel does a couple of things for us. First, if we try to write to a channel that has been closed, we get a "panic". This is an illegal operation that will cause our application to exit with an error.

More interesting is what happens when we read from a closed channel. If the channel still contains items, then we can continue to take items off of the channel. But once the channel is empty, if we try to take an item, it will not block. We'll see a bit more of this in the next section.

Taking an Item off of a Channel

There are a couple of different ways to take an item off of a channel.

Note: this is often referred to as "reading" an item off of a channel, but it is a "read and remove" operation. There is no way to "peek" at an item on a channel (meaning, look at the item without removing it). In addition, there is no way to see how many items are currently on a channel.

To read an item off of a channel, use the "<-" operator with the arrow pointed away from the channel.

Go
    var prime int
    prime = <-ch

This code reads an integer off of the channel and assigns it to the "prime" variable.

We can also combine the creation and assignment of a variable with the ":=" operator:

Go
    prime := <-ch

As mentioned above, if the channel is empty, these operations will block until an item is available. If an item is never added to the channel (and the channel remains open), the result is an operation that will "hang".

Reading from a closed channel
Also as mentioned above, empty channels behave a bit differently when the channel is closed. If we assume the channel "ch" is empty and closed, what happens with the following operation?

Go
    prime := <-ch

This operation does not block since the channel is closed. The variable "prime" will have the default value for an integer (which is 0). If we have a struct or another type, this may be an empty struct or a nil. This is not the best result. For safety, we would need to check to see if the value is valid before using it. But there is another option.

If we expect that a channel may be closed, we can use the following construct: 

Go
    prime, ok := <-ch
    if !ok {
      // channel is closed and "prime" is not valid
    }

Reading from a channel returns 2 values, the item on the channel and a Boolean (true/false) value to indicate if the read was successful. If the read is successful (such a reading a value), then the second value is "true". If the read is unsuccessful (such as reading from an empty, closed channel), then the value is "false".

Reading until a channel is closed
One common scenario is wanting to read all of the available values from a channel until the channel is closed. This can be done with a "for" loop, as in the following:

Go
    for {
      prime, ok := <-ch
      if !ok {
        break
      }
      fmt.Println(prime)
    }

This uses an infinite "for" loop (i.e., a "for" loop without a condition) -- for more information on loops, see the earlier article: Go (golang) Loops - A Unified "for". If the channel is closed, then "ok" will be false. The "break" exits out of the loop.

But we can also use a "for" with a "range" to read from a channel.

Go
    for prime := range ch {
      fmt.Println(prime)
    }

This uses a "range" on the channel and assigns the value to "prime". When the channel is closed, the "for" loop exits. I find this version a bit easier to read.

Relating Channels to C#

I haven't talked too much about C# at this point. When I first started looking at Go several years ago, I thought channels were pretty interesting, but there wasn't anything similar in C#. That changed with .NET Core 3.0.

C# now has a Channel<T> class. I won't go into detail about it here. I've been working on an example using my Digit Recognition application (https://github.com/jeremybytes/digit-display), and this will be a separate article in the near future.

But I will note a few examples from the project (specifically, from the "digit-console-channel/Program.cs" file).

C#
    var channel = Channel.CreateUnbounded<Prediction>();

The above code creates a channel that holds "Prediction" objects -- like Go, a channel can only hold one type of item. The "CreateUnbounded" means that it does not have a pre-defined capacity. We can also create a bounded channel that has a fixed capacity.

One difference is that C# channels have explicit "Reader" and "Writer" properties. Here's a function declaration that uses a "ChannelReader":

C#
    private static async Task Listen(
      ChannelReader<Prediction> reader, 
List<Prediction> log,
      bool mini = false)

This function indicates that it will only read from the channel. To call this function, we can use the "Reader" property on a channel variable:

C#
    var channel = Channel.CreateUnbounded<Prediction>();
    var listener = Listen(channel.Reader, log, mini);

Using a channel writer is similar to this.

In addition, we should close the channel when we're done writing to it. This is done with the "Complete" function on the "Writer".

C#
    private static async Task Produce(
      ChannelWriter<Prediction> writer, 
string[] rawData,)
    {
      var prediction = getPrediction(rawData);
      writer.WriteAsync(prediction);

      writer.Complete();
    }

The above is a simplified version of a function that uses a channel writer. After it is done writing (using "WriteAsync" (yes, lots of stuff is async here)), we can call the "Complete" function to close the channel.

As with Go, we can also read from a channel using a looping operation.

C#
    private static async Task Listen(
      ChannelReader<Prediction> reader, 
List<Prediction> log,
      bool mini = false)
    {
      await foreach (Prediction prediction in reader.ReadAllAsync())
      {
        // Display the result
        WriteOutput(prediction, mini);
      }
    }

This is also simplified. But we can see a "foreach" on the channel "Reader". As with the "Writer", a lot of things are async (which we won't go into here). The "ReadAllAsync" function returns an "IAsyncEnumerable" that we can "await foreach". It's a bit confusing until you get used to it.

This "await foreach" will wait for a new item to be available in the channel. It may need to pause until an item is available. Once the channel writer is "Complete" (i.e., closed), then the foreach loop will exit.

The C# version of channels is a bit more involved than the Go version (particularly with the async parts in C#), but the same concepts are there: creating a channel, writing to it, reading from it, and closing it. I've been spending a bit of time exploring C# channels, and I'm looking forward to writing about them in more detail.

Update: If you're interested in learning more about C# Channels, here's an article: Introduction to Channels in C#.

When to Close a Channel

One problem that may come up in our code is that we may not have a good place to close a channel. For example, consider the following Go code:

Go
    func fetchPersonToChannel(id int, ch chan<- person) {
      p, err := getPerson(id)
      if err != nil {
        log.Printf("err calling getPerson(%d): %v", id, err)
        return
      }
      ch <- p
    }

This function makes a service call (with "getPerson") and writes the results to a channel. This function is called multiple times in a "for" loop:

Go
    for _, id := range ids {
      go fetchPersonToChannel(id, ch)
    }

This loops through a collection and calls the "fetchPersonToChannel" function concurrently (notice the "go").

So when is it safe to close the channel?

If we close the channel immediately after the "for" loop, we will run into problems. The concurrent operations are not complete at that point, so we would end up closing the channel before we are done with it. This will result is a "panic" when those operations try to write to the closed channel.

One answer to this problem is to use a WaitGroup. This is a sort of reference counter where we can keep track of running operations.

And we'll explore WaitGroup in the next article.


Wrap Up

Channels in Go are really interesting. Since a channel is fairly limited in what we can do with it, there's not a whole lot that we need to learn to be proficient.

The biggest stumbling block that I've come across when using channels is inadvertently blocking operations. There have been times when a channel capacity is not big enough, and I have blocked my code trying to add to the channel. And there have been times where I have not closed a channel, and I have blocked my code trying to read from a channel.

But once I got the hang of if, things started to go much more smoothly. And I've taken what I've learned from channels in Go and applied much of it to using Channel<T> in C#. They are not exactly equivalent because Go and C# handle concurrent code quite a bit differently, but there are definitely ideas we can share. We'll explore channels in C# in an upcoming article.

Until then, keep exploring.

Happy Coding!

No comments:

Post a Comment