这一段代码首先会将当前goroutine绑定在当前的P上返回对应的local,然后尝试从local的private中获取,然后需要把private字段置空,因为已经拿到了想要的对象;
private中获取不到,那么就去shared的头部获取;
shared也没有,那么尝试遍历所有的 local,尝试从它们的 shared 弹出一个元素;
最后如果还是没有,那么就直接调用预先设置好的 New 函数,创建一个出来。
pin func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { return indexLocal(l, pid), pid } return p.pinSlow() }pin方法里面首先会调用runtime_procPin方法会先获取当前goroutine,然后绑定到对应的M上,然后返回M目前绑定的P的id,因为这个pid后面会用到,防止在使用途中P被抢占,具体的细节可以看这篇:https://zhuanlan.zhihu.com/p/99710992。
接下来会使用原子操作取出localSize,如果当前pid大于localSize,那么就表示Pool还没创建对应的poolLocal,那么调用pinSlow进行创建工作,否则调用indexLocal取出pid对应的poolLocal返回。
func indexLocal(l unsafe.Pointer, i int) *poolLocal { lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{})) return (*poolLocal)(lp) }indexLocal里面是使用了地址操作,传入的i是数组的index值,所以需要获取poolLocal{}的size做一下地址的位移操作,然后再转成转成poolLocal地址返回。
pinSlow func (p *Pool) pinSlow() (*poolLocal, int) { // 解除pin runtime_procUnpin() // 加上全局锁 allPoolsMu.Lock() defer allPoolsMu.Unlock() // pin住 pid := runtime_procPin() s := p.localSize l := p.local // 重新对pid进行检查 if uintptr(pid) < s { return indexLocal(l, pid), pid } // 初始化local前会将pool放入到allPools数组中 if p.local == nil { allPools = append(allPools, p) } // 当前P的数量 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) atomic.StoreUintptr(&p.localSize, uintptr(size)) return &local[pid], pid }因为allPoolsMu是一个全局Mutex锁,因此上锁会比较慢可能被阻塞,所以上锁前调用runtime_procUnpin方法解除pin的操作;
在解除绑定后,pinSlow 可能被其他的线程调用过了,p.local 可能会发生变化。因此这时候需要再次对 pid 进行检查。
最后初始化local,并使用原子操作对local和localSize设值,返回当前P对应的local。
到这里pin方法终于讲完了。画一个简单的图描述一下这整个流程:
下面我们再回到Get方法中往下走,代码我再贴一遍,以便阅读:
func (p *Pool) Get() interface{} { ... //2.优先从local的private中获取 x := l.private l.private = nil if x == nil { //3,private没有,那么从shared的头部获取 x, _ = l.shared.popHead() //4. 如果都没有,那么去别的local上去偷一个 if x == nil { x = p.getSlow(pid) } } ... return x }如果private中没有值,那么会调用shared的popHead方法获取值。
popHead func (c *poolChain) popHead() (interface{}, bool) { // 这里头部是一个poolChainElt d := c.head // 遍历poolChain链表 for d != nil { // 从poolChainElt的环状列表中获取值 if val, ok := d.popHead(); ok { return val, ok } // load poolChain下一个对象 d = loadPoolChainElt(&d.prev) } return nil, false }popHead方法里面会获取到poolChain的头结点,不记得poolChain数据结构的同学建议往上面翻一下再回来。
接着有个for循环会挨个从poolChain的头结点往下遍历,直到获取对象返回。
func (d *poolDequeue) popHead() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) // headTail的高32位为head,低32位为tail head, tail := d.unpack(ptrs) // 首尾相等,那么这个队列就是空的 if tail == head { return nil, false } // 这里需要head--之后再获取slot head-- ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[head&uint32(len(d.vals)-1)] break } } val := *(*interface{})(unsafe.Pointer(slot)) // 说明没取到缓存的对象,返回 nil if val == dequeueNil(nil) { val = nil } // 重置slot *slot = eface{} return val, true }
poolDequeue的popHead方法首先会获取到headTail的值,然后调用unpack解包,headTail是一个64位的值,高32位表示head,低32位表示tail。
判断head和tail是否相等,相等那么这个队列就是空的;
如果队列不是空的,那么将head减一之后再使用,因为head当前指的位置是空值,表示下一个新对象存放的位置;