多图详解Go中的Channel源码 (4)

然后再来看看chanrecv中发送者队列有数据的时候移交缓冲区的数据是怎么做的:

这里会将recvx为0处的数据直接从缓存区拷贝数据给接收者,然后将发送者拷贝数据到缓冲区recvx指针处,然后将recvx指针加1并将recvx赋值给sendx,由于是满的所以用recvx加1的效果实现了将新加入的数据入库到队尾的操作。

接着往下看:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 如果缓冲区中有数据 if c.qcount > 0 { qp := chanbuf(c, c.recvx) ... // 从缓冲区复制数据到 ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 接收数据的指针前移 c.recvx++ // 环形队列,如果到了末尾,再从0开始 if c.recvx == c.dataqsiz { c.recvx = 0 } // 缓冲区中现存数据减一 c.qcount-- unlock(&c.lock) return true, true } ... }

到了这里,说明缓冲区中有数据,但是发送者队列没有数据,那么将数据拷贝到接收数据的协程,然后将接收数据的指针前移,如果已经到了队尾,那么就从0开始,最后将缓冲区中现存数据减一并解锁。

下面就是缓冲区中没有数据的情况:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 非阻塞,直接返回 if !block { unlock(&c.lock) return false, false } // 创建sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 将sudog添加到接收队列中 c.recvq.enqueue(mysg) // 阻塞住goroutine,等待被唤醒 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) ... }

如果是非阻塞调用,直接返回;阻塞调用会将当前goroutine 封装成sudog,然后将sudog添加到接收队列中,调用gopark阻塞住goroutine,等待被唤醒。

关闭通道

关闭通道会调用到closechan方法:

func closechan(c *hchan) { // 1. 校验chan是否已初始化 if c == nil { panic(plainError("close of nil channel")) } // 加锁 lock(&c.lock) // 如果已关闭了,那么不能被再次关闭 if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } ... // 设置chan已关闭 c.closed = 1 // 申明一个存放g的list,用于存放在等待队列中的groutine var glist gList // 2. 获取所有接收者 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } // 加入队列中 glist.push(gp) } // 获取所有发送者 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } // 加入队列中 glist.push(gp) } unlock(&c.lock) // 3.唤醒所有的glist中的goroutine for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }

这个方法首先会校验chan是否已被初始化,然后加锁之后再校验是否已被关闭过,如果校验都通过了,那么将closed字段设值为1;

遍历所有的接收者和发送者,并将其goroutine 加入到glist中;

将所有glist中的goroutine加入调度队列,等待被唤醒,这里需要注意的是发送者在被唤醒之后会panic;

总结

chan在go中是一个非常强大的工具,使用它可以实现很多功能,但是为了能够高效的使用它我们也应该去了解里面是如何实现的。这篇文章通过一步步分析从零开始了解go的chan是如何实现的,以及在使用过程中有什么需要注意的事项,chan的buf环形队列是怎样维护的,希望能对你有所帮助~

Reference

https://speakerdeck.com/kavya719/understanding-channels

https://golang.org/ref/mem

https://github.com/talkgo/night/issues/450

https://codeburst.io/diving-deep-into-the-golang-channels-549fd4ed21a8

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpydgx.html