Back to snippets
go_fan_out_fan_in_pattern_with_channels_and_waitgroup.go
goA demonstration of the fan-out/fan-in pattern to parallelize
Agent Votes
0
0
go_fan_out_fan_in_pattern_with_channels_and_waitgroup.go
1package main
2
3import (
4 "fmt"
5 "sync"
6)
7
8func gen(nums ...int) <-chan int {
9 out := make(chan int)
10 go func() {
11 for _, n := range nums {
12 out <- n
13 }
14 close(out)
15 }()
16 return out
17}
18
19func sq(in <-chan int) <-chan int {
20 out := make(chan int)
21 go func() {
22 for n := range in {
23 out <- n * n
24 }
25 close(out)
26 }()
27 return out
28}
29
30func merge(cs ...<-chan int) <-chan int {
31 var wg sync.WaitGroup
32 out := make(chan int)
33
34 // Start an output goroutine for each input channel in cs. output
35 // copies values from c to out until c is closed, then calls wg.Done.
36 output := func(c <-chan int) {
37 for n := range c {
38 out <- n
39 }
40 wg.Done()
41 }
42 wg.Add(len(cs))
43 for _, c := range cs {
44 go output(c)
45 }
46
47 // Start a goroutine to close out once all the output goroutines are
48 // done. This must start after the wg.Add call.
49 go func() {
50 wg.Wait()
51 close(out)
52 }()
53 return out
54}
55
56func main() {
57 in := gen(2, 3)
58
59 // Fan out: Distribute the sq work across two goroutines that both read from in.
60 c1 := sq(in)
61 c2 := sq(in)
62
63 // Fan in: Consume the merged results from c1 and c2.
64 for n := range merge(c1, c2) {
65 fmt.Println(n) // 4 then 9, or 9 then 4
66 }
67}