但是问题仍在继续,这里仍旧是因为我们预知通道接收次数,以及发送放空次数,所以可以写出这个顺序和次数,这仍旧是易碎的,本质上除了让我们学习了一下这种写法,与上面发生的无异。
我们需要一种方式,可以在未知goroutine数量,未知通道大小的情况下,随时按需阻止下游阶段发送未发送完毕的通道。
因为接收操作在一个封闭的通道可以总是立即执行,产生类元素的零值。
这就意味着main函数能够对所有被done通道关闭的发送者解除阻塞。这实际上是一个广播信号发送者。我们扩展管道功能的each来接收done作为一个参数来安排通过defer来延迟关闭,以便所有的main函数的返回路径能够发送信号到管道阶段去退出。
先看merge函数:
defer wg.Done() for n := range c { select { case out <- n: case <-done: return } }我们在for循环前面加入了一行sync.WaitGroup的Done的延迟方法,然后修改了select内部当done可被输出时,直接结束merge函数(更别提跳出循环了),直接执行defer的wg.Done去掉一次计数器数值。然后看main函数:
func main() { done := make(chan struct{}) defer close(done) in := pipeline.Gen(2, 3) c1 := pipeline.Sq(in) c2 := pipeline.Sq(in) out := merge(done, c1, c2) fmt.Println(<-out) }首先我们去掉了done通道的缓冲区,加了一行关闭done通道的延迟操作。当代码执行玩fmt的一次输出以后,main函数执行完毕,会调用defer关闭done通道,回到merge函数中,done通道被关闭以后,case ->done被执行merge函数执行完毕,执行wg.Done()。
总结本文详细阐述了Go管道的概念,是有三组动作:生产通道,处理通道,使用通道,这三组动作实现了Go的管道。通过一个例子我们搞清楚了管道的含义,接着又介绍了Fan-out,是关于多个函数对同一个通道的操作,以及一个函数对多个通道的操作(例子中使用了merge,将多个通道合并为一个)。这期间,我们研究了sync.WaitGroup以及Go语言中的包引用特性。最后,我们在例子中发现了管道并发的问题,并循序渐进地找到了解决方法,在此期间,让我们加深了对defer,管道,通道,select的理解。
参考资料Go官方文档