从chan获取数据实现函数为 chanrecv。下面我们看一下代码实现:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if c == nil { // 如果 c 为空且是非阻塞调用,那么直接返回 (false,false) if !block { return } // 阻塞调用直接等待 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // 对于非阻塞的情况,并且没有关闭的情况 // 如果是无缓冲chan或者是chan中没有数据,那么直接返回 (false,false) if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } // 上锁 lock(&c.lock) // 如果已经关闭,并且chan中没有数据,返回 (true,false) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } ... }chanrecv方法和chansend方法是一样的,首先也是做非空判断,如果chan没有初始化,那么如果是非阻塞调用,那么直接返回 (false,false),阻塞调用会直接等待;
下面的两个if判断我放在一起来进行讲解,因为这里和chansend是不一样的,chanrecv要根据不同条件需要返回不同的结果。
在上锁之前的判断是边界条件的判断:如果是非阻塞调用会判断chan没有发送方(dataqsiz为空且发送队列为空),或chan的缓冲为空(dataqsiz>0 并且qcount==0)并且chan是没有close,那么需要返回 (false,false);而chan已经关闭了,并且buf中没有数据,需要返回 (true,false);
为了实现这个需求,所以在chanrecv方法里面边界条件的判断都使用atomic方法进行了获取。
因为需要正确的得到chan已关闭,并且 buf 空会返回 (true, false),而不是 (false,false),所以在lock上锁之前需要使用atomic来获取参数防止重排序(Happens Before),因此必须使此处的 qcount 和 closed 的读取操作的顺序通过原子操作得到顺序保障。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 从发送者队列获取数据 if sg := c.sendq.dequeue(); sg != nil { // 发送者队列不为空,直接从发送者那里提取数据 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } ... } func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果是无缓冲区chan if c.dataqsiz == 0 { ... if ep != nil { // 直接从发送者拷贝数据 recvDirect(c.elemtype, sg, ep) } // 有缓冲区chan } else { // 获取buf的存放数据指针 qp := chanbuf(c, c.recvx) ... // 直接从缓冲区拷贝数据给接收者 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 从发送者拷贝数据到缓冲区 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 将发送者唤醒 goready(gp, skip+1) }在这里如果有发送者在队列等待,那么直接从发送者那里提取数据,并且唤醒这个发送者。需要注意的是由于有发送者在等待,所以如果有缓冲区,那么缓冲区一定是满的。
在唤醒发送者之前需要对缓冲区做判断,如果是无缓冲区,那么直接从发送者那里提取数据;如果有缓冲区首先会获取recvx的指针,然后将从缓冲区拷贝数据给接收者,再将发送者数据拷贝到缓冲区。
然后将recvx加1,相当于将新的数据移到了队尾,再将recvx的值赋值给sendx,最后调用goready将发送者唤醒,这里有些绕,我们通过图片来展示:
这里展示的是在chansend中将数据拷贝到缓冲区中,当数据满的时候会将sendx的指针置为0,所以当buf环形队列是满的时候sendx等于recvx。