多图详解Go的sync.Pool源码 (3)

CAS重新设值新的headTail,成功之后获取slot,这里因为vals大小是2的n 次幂,因此len(d.vals)-1)之后低n位全是1,和head取与之后可以获取到head的低n位的值;

如果slot所对应的对象是dequeueNil,那么表示是空值,直接返回,否则将slot指针对应位置的值置空,返回val。

如果shared的popHead方法也没获取到值,那么就需要调用getSlow方法获取了。

getSlow func (p *Pool) getSlow(pid int) interface{} { size := atomic.LoadUintptr(&p.localSize) // load-acquire locals := p.local // load-consume // 遍历locals列表,从其他的local的shared列表尾部获取对象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) // victim的private不为空则返回 if x := l.private; x != nil { l.private = nil return x } // 遍历victim对应的locals列表,从其他的local的shared列表尾部获取对象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 获取不到,将victimSize置为0 atomic.StoreUintptr(&p.victimSize, 0) return nil }

getSlow方法会遍历locals列表,这里需要注意的是,遍历是从索引为 pid+1 的 poolLocal 处开始,尝试调用shared的popTail方法获取对象;如果没有拿到,则从 victim 里找。如果都没找到,那么就将victimSize置为0,下次就不找victim了。

poolChain&popTail func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) // 如果最后一个节点是空的,那么直接返回 if d == nil { return nil, false } for { // 这里获取的是next节点,与一般的双向链表是相反的 d2 := loadPoolChainElt(&d.next) // 获取尾部对象 if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { return nil, false } // 因为d已经没有数据了,所以重置tail为d2,并删除d2的上一个节点 if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { storePoolChainElt(&d2.prev, nil) } d = d2 } }

判断poolChain,如果最后一个节点是空的,那么直接返回;

进入for循环,获取tail的next节点,这里需要注意的是这个双向链表与一般的链表是反向的,不清楚的可以再去看看第一张图;

调用popTail获取poolDequeue列表的对象,有对象直接返回;

d2为空则表示已经遍历完整个poolChain双向列表了,都为空,那么直接返回;

通过CAS将tail重置为d2,因为d已经没有数据了,并将d2的prev节点置为nil,然后将d置为d2,进入下一个循环;

poolDequeue&popTail func (d *poolDequeue) popTail() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) // 和pophead一样,将headTail解包 head, tail := d.unpack(ptrs) // 首位相等,表示列表中没有数据,返回 if tail == head { return nil, false } ptrs2 := d.pack(head, tail+1) // CAS重置tail位置 if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // 获取tail位置对象 slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } val := *(*interface{})(unsafe.Pointer(slot)) // 判断对象是不是为空 if val == dequeueNil(nil) { val = nil } // 将slot置空 slot.val = nil atomic.StorePointer(&slot.typ, nil) return val, true }

如果看懂了popHead,那么这个popTail方法是和它非常的相近的。

popTail简单来说也是从队列尾部移除一个元素,如果队列为空,返回 false。但是需要注意的是,这个popTail可能会被多个消费者调用,所以需要循环CAS获取对象;在poolDequeue环状列表中tail是有数据的,不必像popHead中head--。

最后,需要将slot置空。

大家可以再对照一下图回顾一下代码:

Group 39

Put方法 func (p *Pool) Put(x interface{}) { if x == nil { return } ... l, _ := p.pin() if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } runtime_procUnpin() ... }

看完了Get方法,看Put方法就容易多了。同样Put方法首先会去Pin住当前goroutine和P,然后尝试将 x 赋值给 private 字段。如果private不为空,那么就调用pushHead将其放入到shared队列中。

poolChain&pushHead func (c *poolChain) pushHead(val interface{}) { d := c.head // 头节点没有初始化,那么设值一下 if d == nil { const initSize = 8 // Must be a power of 2 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } // 将对象加入到环状队列中 if d.pushHead(val) { return } newSize := len(d.vals) * 2 // 这里做了限制,单个环状队列不能超过2的30次方大小 if newSize >= dequeueLimit { newSize = dequeueLimit } // 初始化新的环状列表,大小是d的两倍 d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) // push到新的队列中 d2.pushHead(val) }

如果头节点为空,那么需要创建一个新的poolChainElt对象作为头节点,大小为8;然后调用pushHead放入到环状队列中;

如果放置失败,那么创建一个 poolChainElt 节点,并且双端队列的长度翻倍,当然长度也不能超过dequeueLimit,即2的30次方;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdspz.html