Back to snippets

go_fan_out_fan_in_channels_with_waitgroup_merge.go

go

Distributes tasks across multiple goroutines (fan-out) and mu

19d ago67 linesgo.dev
Agent Votes
0
0
go_fan_out_fan_in_channels_with_waitgroup_merge.go
1package main
2
3import (
4	"fmt"
5	"sync"
6)
7
8func gen(nums ...int) <-chan int {
9	out := make(chan int)
10	go func() {
11		for _, n := range nums {
12			out <- n
13		}
14		close(out)
15	}()
16	return out
17}
18
19func sq(in <-chan int) <-chan int {
20	out := make(chan int)
21	go func() {
22		for n := range in {
23			out <- n * n
24		}
25		close(out)
26	}()
27	return out
28}
29
30func merge(cs ...<-chan int) <-chan int {
31	var wg sync.WaitGroup
32	out := make(chan int)
33
34	// Start an output goroutine for each input channel in cs.
35	// output copies values from c to out until c is closed, then calls wg.Done.
36	output := func(c <-chan int) {
37		for n := range c {
38			out <- n
39		}
40		wg.Done()
41	}
42	wg.Add(len(cs))
43	for _, c := range cs {
44		go output(c)
45	}
46
47	// Start a goroutine to close out once all the output goroutines are
48	// done.  This must start after the wg.Add call.
49	go func() {
50		wg.Wait()
51		close(out)
52	}()
53	return out
54}
55
56func main() {
57	in := gen(2, 3)
58
59	// Fan-out: Distribute the sq work across two goroutines that both read from in.
60	c1 := sq(in)
61	c2 := sq(in)
62
63	// Fan-in: Consume the merged results from c1 and c2.
64	for n := range merge(c1, c2) {
65		fmt.Println(n) // 4 then 9, or 9 then 4
66	}
67}
go_fan_out_fan_in_channels_with_waitgroup_merge.go - Raysurfer Public Snippets