Go并发模式:管道与取消

关键字:Go语言管道,取消机制,并发,sync.WaitGroup,包引用,通道,defer,select

GO并发模式:管道与取消 简介

Go的并发能力可以使构建一个流数据管道变得非常容易,并且可以高校地使用机器I/O和多核处理器。这篇文章展示了一些例子,包括管道,对操作失败的处理技术。

管道的概念

在Go里,并没有正式的管道的定义,它只是众多并发程序其中的一个。通俗来讲,一个管道是一系列由通道连接的阶段,每个阶段都是一组运行着同样函数的goroutine。在每个阶段里,goroutine在干着:

通过接入通道(inbound channels)接收上游流下来的值

对这些数据执行某个函数,通常会产生新的值

通过导出通道(outbound channels)下游发送值

第一个阶段也叫source或者producer

最后一个阶段也叫sink或者consumer
以上这两个阶段都只能有一个通道,或者是接入通道或者是导出通道,不能同时拥有这两种。而其他每个阶段都可以共同拥有任意数量的接入通道和导出通道。

一个用来学习的例子

下面我们将展开一个简单的管道例子,来阐述其中的思想和技术,后面会有实际的例子。

平方函数

直接看代码中注释。

注意goroutine是函数体内并发,有一个壳sandbox扣着它。

// 要想run,必须package main,默认是文件夹目录名,要更改一下 package main import "fmt" // 设想一个拥有三个阶段的管道 /* * First Stage: gen * params: 一个以逗号分隔的整数列表,数量不限 * return: 一个通道,包含参数中整数列表的通道 */ func gen(nums ... int) <-chan int { out := make(chan int) // 通过一个goroutine来将参数中的每个整数发送到通道中去。 go func() { for _, n := range nums { out <- n } close(out) // close方法作为上面的for循环的终止条件,不能省略。 }() return out } /* * Second Stage: sq * params: 一个包含参数中整数列表的通道 * return: 一个包含将参数通道中每个整数平方后的列表的通道 * note: 因为参数和返回值的类型都是相同的整型通道,所以可以反复嵌套该方法。 */ func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n // 平方 } close(out) }() return out } /* * Final Stage: main * 是一个main函数,没有参数也没有返回值,它相当于客户端调用 */ func main() { c := gen(2, 3) // 建立通道 out := sq(c) // 通道处理 // 上面传入两个值2和3,那么这里就要对应的消费两次输出 fmt.Println(<-out) fmt.Println(<-out) // 嵌套sq for n := range sq(sq(gen(1, 2, 4, 5))) { fmt.Println(n) } } // output: // 4 // 9 // 1 // 16 // 256 // 625 Fan-out和Fan-in

Fan-out,扇出。多个函数可以读取同一个通道直到该通道关闭。可让一群工人并用CPU和IO

Fan-in,扇入。一个函数可以读取多个输入,每个输入被多路复用到一个独立的通道上,当所有输入被关闭时,这个通道也会被关闭,同时它也会关掉这个函数的使用权。

下面我们将运行连个sq函数的实例,都会读取同一个输入通道,我们将使用一个新函数,叫做merge,来扇入多个结果。

向一个已关闭的通道发送值,会引起通道panic错误,所以引入了sync.WaitGroup功能来控制当所有发送行为结束以后关闭通道。

sync.WaitGroup

sync.WaitGroup像java的倒计时锁,首先我们定义它的Wait方法设置一个锁到某个并发程序中,然后通过Add方法定义计数器大小CounterSize,该大小为最多发送数据到通道的执行次数,每次执行结束要通过Done方法来使CounterSize减一,直到CounterSize为0,上面我们定义的Wait才会释放锁。

注意,WaitGroup的计数器大小CounterSize在初始化时默认为1,也就是说没调用Add之前,需要一次Done方法执行以后,Wait锁才会释放。

merge函数 func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup // 定义一个独立通道out,将接收所有值 out := make(chan int) // 将通道中所有值转到out output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) // 将merge参数中所有通道的值都合到唯一通道out上去 for _, c := range cs { go output(c) } // 启动一个额外的goroutine(不会按照代码顺序执行,而是一进到merge就会启动)来等待直到所有通道Done以后关闭那个唯一通道out。 go func() { wg.Wait()// 直到wg全都Done了才会继续执行。 close(out) }() return out } Go的包引用问题

当我们要使用其他Go文件内部的函数时,会有两种处理方法:

将函数绑定到某个type下,然后在调用时创建那个type的实例即可调用。

将函数名首字母大写,我们就可以通过包名调用了。

以上两种方法都会存在一个问题,就是包引用问题,如果你找不到源码位置,调用其函数就无从谈起,那么如何正确的引用包呢?

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpfpwy.html