In the last article, we looked at using channels in Go to get data from
one concurrent operation to another (Go (golang) Channels - Moving Data Between Concurrent Processes). But we also ran across a problem: How do we know when a concurrent
operation is complete? One answer is that we can use a WaitGroup.
A WaitGroup is a counter that we can add and subtract from. WaitGroup.Wait() is a blocking operation that will wait until the counter reaches zero.
By using a WaitGroup, we can signal that a concurrent operation is complete so
that we can continue processing.
Concurrency in Go is a bit different than concurrency in C#. Today, we'll look
at why we need a WaitGroup in Go and how we can use it in our applications.
Motivation: I have been using C# as a primary language for 15
years. Exploring other languages gives us insight into different ways of
handling programming tasks. Even though Go (golang) is not my production
language, I've found several features and conventions to be interesting. By
looking at different paradigms, we can grow as programmers and become more
effective in our primary language.
Resources: For links to a video walkthrough, CodeTour, and
additional articles, see A Tour of Go for the C# Developer.
The Problem
In the last article, we saw a potential problem when it came to using channels
and concurrent operations. Let's look at that example.
We have a function that gets data from a service (a "person") and puts it onto
a channel.
Go
func fetchPersonToChannel(id int, ch chan<- person) {
p, err := getPerson(id)
if err != nil {
log.Printf("err calling getPerson(%d): %v", id,
err)
return
}
ch <- p
}
The last line of this function puts the person ("p") onto the channel ("ch").
For more information on writing to a channel, see the
previous article.
In the main function of the application, we have a "for" loop that calls this
function multiple times, and we have a "for" loop that reads from the channel
and displays the data.
Go
ch := make(chan person, 10)
// put values onto a channel
for _, id := range ids {
go fetchPersonToChannel(id, ch)
}
// read values from the channel
for p := range ch {
fmt.Printf("%d: %v\n", p.ID, p)
}
The first "for" loop iterates through the "ids" collection can calls the
"fetchPersonToChannel" function. Notice that this is a goroutine (using the
"go" keyword), which means that the loop continues without waiting for each
call to "fetchPersonToChannel" to complete. This creates multiple concurrent
operations.
The second "for" loop reads values from the channel until the channel is
closed. It takes the value from the channel and outputs it to the console.
For more information on reading from a channel, see the
previous article.
But there's a problem...
The channel is not closed anywhere in the code. As a reminder, when we read
from an empty, open channel, the operation blocks until there is an item to
read.
Since the channel is never closed, the second "for" loop will block
indefinitely.
Can we just close the channel?
Since the channel needs to be closed, can we just close it? Let's consider the
following code:
Go
ch := make(chan person, 10)
// put values onto a channel
for _, id := range ids {
go fetchPersonToChannel(id, ch)
}
close(ch)
// read values from the channel
for p := range ch {
fmt.Printf("%d: %v\n", p.ID, p)
}
This is the same as above except we have a "close(ch)" after the first "for"
loop. Unfortunately, this code will not work as expected.
Since the goroutines (i.e., the concurrent calls to "fetchPersonToChannel")
take a little bit of time to complete, the channel will be closed before the
goroutines finish. The result is that the "fetchPersonToChannel" function will
try to write to a closed channel. And this will cause a panic (similar to an
unhandled runtime exception in C#).
We need to wait for the goroutines to finish before we close the channel. And
that's where "WaitGroup" comes in.
WaitGroup
"WaitGroup" is a type in the "sync" package. It has limited functionality
(just 3 methods), but it will help us solve our issue.
WaitGroup Members
- Add(int) -- Increments the counter based on the parameter (generally "1").
- Done() -- Decrements the counter by one.
- Wait() -- Blocks until the counter is 0.
Let's put these into action.
WaitGroup.Add() & WaitGroup.Wait()
Here's an update to the block of code with the 2 "for" loops:
Go
ch := make(chan person, 10)
var wg sync.WaitGroup
// put values onto a channel
for _, id := range ids {
wg.Add(1)
go fetchPersonToChannel(id, ch, &wg)
}
wg.Wait()
close(ch)
// read values from the channel
for p := range ch {
fmt.Printf("%d: %v\n", p.ID, p)
}
The second line declares a "WaitGroup" variable named "wg".
Inside the first "for" loop, we use "Add(1)" to increment the counter. Since
the "ids" array has 9 values, the counter will quickly increment to "9".
Also inside the first "for" loop, we have changed the signature of the
"fetchPersonToChannel" function. It now takes an additional parameter -- a
pointer to a Wait Group (we'll see the updated function in just a bit). We
need to pass a pointer to the WaitGroup so that it can be updated from within
the function. (Pointers is a bigger topic that we won't go into today.)
After the first "for" loop, we call "Wait()" on our WaitGroup variable. This
will block until the WaitGroup counter reaches zero. (We'll see how the
counter gets decremented in just a bit.)
After the WaitGroup reaches zero, we close the channel. This ensures that the
channel is not closed until after all of the goroutines are done (and we are
done writing to the channel).
WaitGroup.Done()
We decrement the counter inside the "fetchPersonToChannel" function. Here's
the updated function:
Go
func fetchPersonToChannel(id int, ch chan<- person, wg
*sync.WaitGroup) {
defer wg.Done()
p, err := getPerson(id)
if err != nil {
log.Printf("err calling getPerson(%d): %v", id,
err)
return
}
ch <- p
}
We added a third parameter to the function signature: a pointer to a WaitGroup
called "wg".
The first line of the function is "defer wg.Done()". The "Done" function will
decrement the counter.
More importantly is the "defer". As we saw in a prior article (Go (golang) defer - A Better “finally”), "defer" is similar to a "finally" in C# -- it will run this line of code
before the function exits.
This function can exit in 2 ways: (1) if there is an error, the error is
logged and the function exits; (2) if there is no error, the data is put onto
the channel and the function exits. Either way, the "defer" will run and
"Done" will be called on the WaitGroup.
Program Flow
Here's the over all flow:
- When the first "for" loop runs, the counter is incremented (with "wg.Add") before each goroutine is started. In this case, it quickly increases the counter to 9.
- The code hits the "wg.Wait()" call and pauses.
- Inside each goroutine, the counter is decremented (with "wg.Done"). The counter decreases until it reaches zero.
- When the counter reaches zero, "wg.Wait()" stops waiting and the channel is closed.
- The second "for" loop reads items from the channel.
- Since the channel is closed, the for loop will exit once all of the values have been read from the channel.
Exiting Too Early
So far, our problem has been a blocking operation that hangs the application.
But with concurrent operations, we may have the opposite problem: our
application may exit before a concurrent operation has finished. Let's consider
an example:
Go
func logMessages(count int) {
for i := 0; i < count; i++ {
log.Printf("Logging item #%d\n", i)
time.Sleep(1 * time.Second)
}
}
func main() {
go logMessages(10)
time.Sleep(3 * time.Second)
fmt.Println("Done")
}
In this code, we have a "logMessages" function that logs a number of
messages. By default, these messages go to the console.
Notice the "time.Sleep()" function call. This pauses for 1 second between
each log message. So, if we call the "logMessages" function with a parameter of 10,
this function will take (at least) 10 seconds to complete.
In the "main" function, we call "logMessages" concurrently (using "go"). Then,
the function sleeps for 3 seconds, and finally, it prints "Done" to the
console.
After this, the application exits. Here is a sample output:
Console:
PS C:\GoDemo\waiting> .\waiting.exe
2021/02/02 07:25:37 Logging item #0
2021/02/02 07:25:38 Logging item #1
2021/02/02 07:25:39 Logging item #2
Done
2021/02/02 07:25:40 Logging item #3
PS C:\GoDemo\waiting>
In this run, the logging function has a chance to output 4 messages before the
application exits. (It's interesting that the "Done" is printed before the
last message, but that's just part of the "fun" when dealing with
concurrency.)
Keeping the Application Alive
We can use a WaitGroup to keep the application alive until the concurrent operation completes. Here's the same application with a
WaitGroup added:
Go
func logMessages(count int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < count; i++ {
log.Printf("Logging item #%d\n", i)
time.Sleep(1 * time.Second)
}
}
func main() {
var wg WaitGroup
wg.Add(1)
go logMessages(10, &wg)
time.Sleep(3 * time.Second)
wg.Wait()
fmt.Println("Done")
}
The "logMessages" now takes a pointer to a WaitGroup as a parameter. We also have a new line in the function (the first line) -- "defer wg.Done()". This calls "Done" once the function is complete.
In the "main" function, we added a WaitGroup variable (called "wg") and
immediately call "Add(1)". Then we call "logMessages" concurrently (passing in
the WaitGroup as a parameter).
After sleeping for 3 seconds, we call "Wait" on the WaitGroup. This blocks the application until the "logMessages" function is complete.
Finally, we print "Done" and exit.
In the output, we can see that this application waits for the concurrent
operation to complete before exiting:
Console:
PS C:\GoDemo\waiting> .\waiting.exe
2021/02/02 07:31:27 Logging item #0
2021/02/02 07:31:28 Logging item #1
2021/02/02 07:31:29 Logging item #2
2021/02/02 07:31:30 Logging item #3
2021/02/02 07:31:31 Logging item #4
2021/02/02 07:31:32 Logging item #5
2021/02/02 07:31:33 Logging item #6
2021/02/02 07:31:34 Logging item #7
2021/02/02 07:31:35 Logging item #8
2021/02/02 07:31:36 Logging item #9
Done
PS C:\GoDemo\waiting>
With the WaitGroup in place, we can keep the application running until all of the goroutines have a chance to finish.
Different from C#
This process is quite a bit different from C#, and that mainly has to do with
the way concurrent operations are coded. In C#, the concurrent operations
can be represented with a Task or a set of Tasks. Then we either "await" the Task or use "Task.WaitAll" to
pause until all of the Tasks have completed.
But in Go, we do not have Tasks. Instead we have goroutines, and a goroutine
does not have an external way to tell that it has completed — there is no way
to “await” a goroutine. So we need to use something like a WaitGroup.
These different approaches have pros and cons. Tasks in C# are very flexible
and powerful, and they have a lot of features. But they are also complex and
take time to fully understand.
Goroutines in Go do not have many features -- mostly, we can kick off a
concurrent operation. The limit to features is nice because that's all we have
to learn: just put a "go" in front of a function and it runs concurrently. But
since we cannot know when a goroutine has completed, we need to resort to
external items like a WaitGroup to know when things are done.
Anonymous Functions
This code is not complete yet. In Go, it is very common to use anonymous
functions for goroutines. An anonymous function is a way to inline function
code.
Like lambda expressions and anonymous delegates in C#, anonymous functions in
Go can capture variables that are in scope (known as "closures" in Go (and
languages other than C# (where we call them "captured variables”))).
Anonymous functions simplify some things in the sample we've looked at. By
using captured variables, we can remove some parameters from the function
signature, and we can also do away with the need to reference pointers.
I won't show what that code looks like here. We'll look at anonymous functions
in the next article.
Update: Here's the article: Go (golang) Anonymous Functions - Inlining Code for Goroutines.
Wrap Up
"WaitGroup" in Go is very useful. Since we cannot directly know when a
goroutine has completed, we can use a WaitGroup (with "Add()" and "Done()") to
keep track. We can use the WaitGroup to pause our application using "Wait()".
Once all of the concurrent operations are complete, the WaitGroup will stop
blocking, and the rest of our code will run.
As I've mentioned before, I find it really interesting to explore different
approaches to programming problems. In this case, Go and C# have fairly
different approaches to concurrent programming. But we can learn from both
languages and expand the way that we think about our solutions.
Happy Coding!
No comments:
Post a Comment