Monday, February 1, 2021

Go (golang) WaitGroup - Signal that a Concurrent Operation is Complete

 In the last article, we looked at using channels in Go to get data from one concurrent operation to another (Go (golang) Channels - Moving Data Between Concurrent Processes). But we also ran across a problem: How do we know when a concurrent operation is complete? One answer is that we can use a WaitGroup.
A WaitGroup is a counter that we can add and subtract from. WaitGroup.Wait() is a blocking operation that will wait until the counter reaches zero.
By using a WaitGroup, we can signal that a concurrent operation is complete so that we can continue processing.

Concurrency in Go is a bit different than concurrency in C#. Today, we'll look at why we need a WaitGroup in Go and how we can use it in our applications.

Motivation: I have been using C# as a primary language for 15 years. Exploring other languages gives us insight into different ways of handling programming tasks. Even though Go (golang) is not my production language, I've found several features and conventions to be interesting. By looking at different paradigms, we can grow as programmers and become more effective in our primary language.

Resources: For links to a video walkthrough, CodeTour, and additional articles, see A Tour of Go for the C# Developer.

The Problem

In the last article, we saw a potential problem when it came to using channels and concurrent operations. Let's look at that example.

We have a function that gets data from a service (a "person") and puts it onto a channel.

Go
    func fetchPersonToChannel(id int, ch chan<- person) {
      p, err := getPerson(id)
      if err != nil {
        log.Printf("err calling getPerson(%d): %v", id, err)
        return
      }
      ch <- p
    }

The last line of this function puts the person ("p") onto the channel ("ch").

For more information on writing to a channel, see the previous article.

In the main function of the application, we have a "for" loop that calls this function multiple times, and we have a "for" loop that reads from the channel and displays the data.

Go
    ch := make(chan person, 10)

    // put values onto a channel
    for _, id := range ids {
      go fetchPersonToChannel(id, ch)
    }

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

The first "for" loop iterates through the "ids" collection can calls the "fetchPersonToChannel" function. Notice that this is a goroutine (using the "go" keyword), which means that the loop continues without waiting for each call to "fetchPersonToChannel" to complete. This creates multiple concurrent operations.

The second "for" loop reads values from the channel until the channel is closed. It takes the value from the channel and outputs it to the console.

For more information on reading from a channel, see the previous article.

But there's a problem...
The channel is not closed anywhere in the code. As a reminder, when we read from an empty, open channel, the operation blocks until there is an item to read.

Since the channel is never closed, the second "for" loop will block indefinitely.

Can we just close the channel?
Since the channel needs to be closed, can we just close it? Let's consider the following code:

Go
    ch := make(chan person, 10)

    // put values onto a channel
    for _, id := range ids {
      go fetchPersonToChannel(id, ch)
    }

    close(ch)

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

This is the same as above except we have a "close(ch)" after the first "for" loop. Unfortunately, this code will not work as expected.

Since the goroutines (i.e., the concurrent calls to "fetchPersonToChannel") take a little bit of time to complete, the channel will be closed before the goroutines finish. The result is that the "fetchPersonToChannel" function will try to write to a closed channel. And this will cause a panic (similar to an unhandled runtime exception in C#).

We need to wait for the goroutines to finish before we close the channel. And that's where "WaitGroup" comes in.

WaitGroup

"WaitGroup" is a type in the "sync" package. It has limited functionality (just 3 methods), but it will help us solve our issue.

WaitGroup Members
  • Add(int) -- Increments the counter based on the parameter (generally "1").
  • Done() -- Decrements the counter by one.
  • Wait() -- Blocks until the counter is 0.
Let's put these into action.

WaitGroup.Add() & WaitGroup.Wait()

Here's an update to the block of code with the 2 "for" loops:

Go
    ch := make(chan person, 10)
    var wg sync.WaitGroup

    // put values onto a channel
    for _, id := range ids {
      wg.Add(1)
      go fetchPersonToChannel(id, ch, &wg)
    }

    wg.Wait()
    close(ch)

    // read values from the channel
    for p := range ch {
      fmt.Printf("%d: %v\n", p.ID, p)
    }

The second line declares a "WaitGroup" variable named "wg".

Inside the first "for" loop, we use "Add(1)" to increment the counter. Since the "ids" array has 9 values, the counter will quickly increment to "9".

Also inside the first "for" loop, we have changed the signature of the "fetchPersonToChannel" function. It now takes an additional parameter -- a pointer to a Wait Group (we'll see the updated function in just a bit). We need to pass a pointer to the WaitGroup so that it can be updated from within the function. (Pointers is a bigger topic that we won't go into today.)

After the first "for" loop, we call "Wait()" on our WaitGroup variable. This will block until the WaitGroup counter reaches zero. (We'll see how the counter gets decremented in just a bit.)

After the WaitGroup reaches zero, we close the channel. This ensures that the channel is not closed until after all of the goroutines are done (and we are done writing to the channel).

WaitGroup.Done()

We decrement the counter inside the "fetchPersonToChannel" function. Here's the updated function:

Go
    func fetchPersonToChannel(id int, ch chan<- person, wg *sync.WaitGroup) {
      defer wg.Done()
      p, err := getPerson(id)
      if err != nil {
        log.Printf("err calling getPerson(%d): %v", id, err)
        return
      }
      ch <- p
    }

We added a third parameter to the function signature: a pointer to a WaitGroup called "wg".

The first line of the function is "defer wg.Done()". The "Done" function will decrement the counter.

More importantly is the "defer". As we saw in a prior article (Go (golang) defer - A Better “finally”), "defer" is similar to a "finally" in C# -- it will run this line of code before the function exits.

This function can exit in 2 ways: (1) if there is an error, the error is logged and the function exits; (2) if there is no error, the data is put onto the channel and the function exits. Either way, the "defer" will run and "Done" will be called on the WaitGroup.

Program Flow

Here's the over all flow:
  1. When the first "for" loop runs, the counter is incremented (with "wg.Add") before each goroutine is started. In this case, it quickly increases the counter to 9.
  2. The code hits the "wg.Wait()" call and pauses.
  3. Inside each goroutine, the counter is decremented (with "wg.Done"). The counter decreases until it reaches zero.
  4. When the counter reaches zero, "wg.Wait()" stops waiting and the channel is closed.
  5. The second "for" loop reads items from the channel.
  6. Since the channel is closed, the for loop will exit once all of the values have been read from the channel.

Exiting Too Early

So far, our problem has been a blocking operation that hangs the application. But with concurrent operations, we may have the opposite problem: our application may exit before a concurrent operation has finished. Let's consider an example:

Go
    func logMessages(count int) {
      for i := 0; i < count; i++ {
        log.Printf("Logging item #%d\n", i)
        time.Sleep(1 * time.Second)
      }
    }

    func main() {
      go logMessages(10)
      time.Sleep(3 * time.Second)
      fmt.Println("Done")
    }

In this code, we have a "logMessages" function that logs a number of messages. By default, these messages go to the console.

Notice the "time.Sleep()" function call. This pauses for 1 second between each log message. So, if we call the "logMessages" function with a parameter of 10, this function will take (at least) 10 seconds to complete.

In the "main" function, we call "logMessages" concurrently (using "go"). Then, the function sleeps for 3 seconds, and finally, it prints "Done" to the console.

After this, the application exits. Here is a sample output:

Console:
    PS C:\GoDemo\waiting> .\waiting.exe
    2021/02/02 07:25:37 Logging item #0
    2021/02/02 07:25:38 Logging item #1
    2021/02/02 07:25:39 Logging item #2
    Done
    2021/02/02 07:25:40 Logging item #3
    PS C:\GoDemo\waiting>

In this run, the logging function has a chance to output 4 messages before the application exits. (It's interesting that the "Done" is printed before the last message, but that's just part of the "fun" when dealing with concurrency.)

Keeping the Application Alive
We can use a WaitGroup to keep the application alive until the concurrent operation completes. Here's the same application with a WaitGroup added:

Go
    func logMessages(count int, wg *sync.WaitGroup) {
      defer wg.Done()
      for i := 0; i < count; i++ {
        log.Printf("Logging item #%d\n", i)
        time.Sleep(1 * time.Second)
      }
    }

    func main() {
      var wg WaitGroup
      wg.Add(1)
      go logMessages(10, &wg)
      time.Sleep(3 * time.Second)
      wg.Wait()
      fmt.Println("Done")
    }

The "logMessages" now takes a pointer to a WaitGroup as a parameter. We also have a new line in the function (the first line) -- "defer wg.Done()". This calls "Done" once the function is complete.

In the "main" function, we added a WaitGroup variable (called "wg") and immediately call "Add(1)". Then we call "logMessages" concurrently (passing in the WaitGroup as a parameter).

After sleeping for 3 seconds, we call "Wait" on the WaitGroup. This blocks   the application until the "logMessages" function is complete.

Finally, we print "Done" and exit.

In the output, we can see that this application waits for the concurrent operation to complete before exiting:

Console:
    PS C:\GoDemo\waiting> .\waiting.exe
    2021/02/02 07:31:27 Logging item #0
    2021/02/02 07:31:28 Logging item #1
    2021/02/02 07:31:29 Logging item #2
    2021/02/02 07:31:30 Logging item #3
    2021/02/02 07:31:31 Logging item #4
    2021/02/02 07:31:32 Logging item #5
    2021/02/02 07:31:33 Logging item #6
    2021/02/02 07:31:34 Logging item #7
    2021/02/02 07:31:35 Logging item #8
    2021/02/02 07:31:36 Logging item #9
    Done
    PS C:\GoDemo\waiting>

With the WaitGroup in place, we can keep the application running until all of the goroutines have a chance to finish.

Different from C#

This process is quite a bit different from C#, and that mainly has to do with the way concurrent operations are coded. In C#, the concurrent operations can be represented with a Task or a set of Tasks. Then we either "await" the Task or use "Task.WaitAll" to pause until all of the Tasks have completed.

But in Go, we do not have Tasks. Instead we have goroutines, and a goroutine does not have an external way to tell that it has completed — there is no way to “await” a goroutine. So we need to use something like a WaitGroup.

These different approaches have pros and cons. Tasks in C# are very flexible and powerful, and they have a lot of features. But they are also complex and take time to fully understand.

Goroutines in Go do not have many features -- mostly, we can kick off a concurrent operation. The limit to features is nice because that's all we have to learn: just put a "go" in front of a function and it runs concurrently. But since we cannot know when a goroutine has completed, we need to resort to external items like a WaitGroup to know when things are done.

Anonymous Functions

This code is not complete yet. In Go, it is very common to use anonymous functions for goroutines. An anonymous function is a way to inline function code.

Like lambda expressions and anonymous delegates in C#, anonymous functions in Go can capture variables that are in scope (known as "closures" in Go (and languages other than C# (where we call them "captured variables”))).

Anonymous functions simplify some things in the sample we've looked at. By using captured variables, we can remove some parameters from the function signature, and we can also do away with the need to reference pointers.

I won't show what that code looks like here. We'll look at anonymous functions in the next article.


Wrap Up

"WaitGroup" in Go is very useful. Since we cannot directly know when a goroutine has completed, we can use a WaitGroup (with "Add()" and "Done()") to keep track. We can use the WaitGroup to pause our application using "Wait()". Once all of the concurrent operations are complete, the WaitGroup will stop blocking, and the rest of our code will run.

As I've mentioned before, I find it really interesting to explore different approaches to programming problems. In this case, Go and C# have fairly different approaches to concurrent programming. But we can learn from both languages and expand the way that we think about our solutions.

Happy Coding!

No comments:

Post a Comment