多图详解Go中的Channel源码 (2)

这里会对chan做一个判断,如果它是空的,那么对于非阻塞的发送,直接返回 false;对于阻塞的通道,将 goroutine 挂起,并且永远不会返回。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 非阻塞的情况下,如果通道没有关闭,满足以下一条: // 1.没有缓冲区并且当前没有接收者 // 2.缓冲区不为0,并且已满 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } ... }

需要注意的是这里是没有加锁的,go虽然在使用指针读取单个值的时候原子性的,但是读取多个值并不能保证,所以在判断完closed虽然是没有关闭的,那么在读取完之后依然可能在这一瞬间从未关闭状态转变成关闭状态。那么就有两种可能:

通道没有关闭,而且已经满了,那么需要返回false,没有问题;

通道关闭,而且已经满了,但是在非阻塞的发送中返回false,也没有问题;

有关go的一致性原语,可以看这篇:The Go Memory Model。

上面的这些判断被称为 fast path,因为加锁的操作是一个很重的操作,所以能够在加锁之前返回的判断就在加锁之前做好是最好的。

下面接着看看加锁部分的代码:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... //加锁 lock(&c.lock) // 是否关闭的判断 if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 从 recvq 中取出一个接收者 if sg := c.recvq.dequeue(); sg != nil { // 如果接收者存在,直接向该接收者发送数据,绕过buffer send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ... }

进入了lock区域之后还需要再判断以下close的状态,然后从recvq 中取出一个接收者,如果已经有接收者,那么就向第一个接收者发送当前enqueue的消息。这里需要注意的是如果有接收者在队列中等待,则说明此时的缓冲区是空的。

既然是一行行分析代码,那么我们再进入到send看一下实现:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ... if sg.elem != nil { // 直接把要发送的数据copy到reciever的栈空间 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒对应的 goroutine goready(gp, skip+1) }

在send方法里,sg就是goroutine打包好的对象,ep是对应要发送数据的指针,sendDirect方法会调用memmove进行数据的内存拷贝。然后goready函数会唤醒对应的 goroutine进行调度。

回到chansend方法,继续往下看:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 如果缓冲区没有满,直接将要发送的数据复制到缓冲区 if c.qcount < c.dataqsiz { // 找到buf要填充数据的索引位置 qp := chanbuf(c, c.sendx) ... // 将数据拷贝到 buffer 中 typedmemmove(c.elemtype, qp, ep) // 数据索引前移,如果到了末尾,又从0开始 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } // 元素个数加1,释放锁并返回 c.qcount++ unlock(&c.lock) return true } ... }

这里会判断buf缓冲区有没有满,如果没有满,那么就找到buf要填充数据的索引位置,调用typedmemmove方法将数据拷贝到buf中,然后重新设值sendx偏移量。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 缓冲区没有空间了,所以对于非阻塞调用直接返回 if !block { unlock(&c.lock) return false } // 创建 sudog 对象 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 将sudog 对象入队 c.sendq.enqueue(mysg) // 进入等待状态 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) ... }

这里会做两部分的操作,对于非阻塞的调用会直接返回;对于阻塞的调用会创建sudog 对象,然后将sudog对象入队之后gopark将 goroutine 转入 waiting 状态,并解锁。调用gopark之后,在使用者看来该向 channel 发送数据的代码语句会进行阻塞。

这里也需要注意一下,如果缓冲区为0,那么也会进入到这里,会调用到gopark立马阻塞,所以在使用的时候需要记得接收数据,防止向chan发送数据的那一端永远阻塞,如:

func process(timeout time.Duration) bool { ch := make(chan bool) go func() { // 模拟处理耗时的业务 time.Sleep((timeout + time.Second)) ch <- true // block fmt.Println("exit goroutine") }() select { case result := <-ch: return result case <-time.After(timeout): return false } }

如果这里在select的时候直接timeout返回了,而没有调用 result := <-ch,那么goroutine 就会永远阻塞。

到这里发送的代码就讲解完了,整个流程大致如下:

比如我要执行:ch<-10

检查 recvq 是否为空,如果不为空,则从 recvq 头部取一个 goroutine,将数据发送过去;

如果 recvq 为空,,并且buf没有满,则将数据放入到 buf中;

如果 buf已满,则将要发送的数据和当前 goroutine 打包成sudog,然后入队到sendq队列中,并将当前 goroutine 置为 waiting 状态进行阻塞。

接收数据

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpydgx.html