多图详解Go的互斥锁Mutex

Mutex 结构体包含两个字段:

字段state:表示当前互斥锁的状态。

字段 sema:是个信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒。

type Mutex struct { state int32 sema uint32 }

在Go的1.9版本中,为了解决等待中的 goroutine 可能会一直获取不到锁,增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在 1 毫秒。

state状态字段所表示的含义较为复杂,如下图所示,最低三位分别表示mutexLocked、mutexWoken、mutexStarving,state总共是32位长度,所以剩下的位置,用来表示可以有1<<(32-3)个Goroutine 等待互斥锁的释放:

Group 1

代码表示如下:

const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving ) 加锁流程 fast path func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } m.lockSlow() }

加锁的时候,一开始会通过CAS看一下能不能直接获取锁,如果可以的话,那么直接获取锁成功。

lockSlow // 等待时间 var waitStartTime int64 // 饥饿标记 starving := false // 唤醒标记 awoke := false // 自旋次数 iter := 0 // 当前的锁的状态 old := m.state for { // 锁是非饥饿状态,锁还没被释放,尝试自旋 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 自旋 runtime_doSpin() // 自旋次数加1 iter++ // 设置当前锁的状态 old = m.state continue } ... }

进入到lockSlow方法之后首先会判断以下能否可以自旋,判断依据就是通过计算:

old&(mutexLocked|mutexStarving) == mutexLocked

可以知道当前锁的状态必须是上锁,并且不能处于饥饿状态,这个判断才为true,然后再看看iter是否满足次数的限制,如果都为true,那么则往下继续。

内层if包含了四个判断:

首先判断了awoke是不是唤醒状态;

old&mutexWoken == 0为真表示没有其他正在唤醒的节点;

old>>mutexWaiterShift != 0表明当前有正在等待的goroutine;

CAS将state的mutexWoken状态位设置为old|mutexWoken,即为1是否成功。

如果都满足,那么将awoke状态设置为真,然后将自旋次数加一,并重新设置状态。

继续往下看:

new := old if old&mutexStarving == 0 { // 如果当前不是饥饿模式,那么将mutexLocked状态位设置1,表示加锁 new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { // 如果当前被锁定或者处于饥饿模式,则waiter加一,表示等待一个等待计数 new += 1 << mutexWaiterShift } // 如果是饥饿状态,并且已经上锁了,那么mutexStarving状态位设置为1,设置为饥饿状态 if starving && old&mutexLocked != 0 { new |= mutexStarving } // awoke为true则表明当前线程在上面自旋的时候,修改mutexWoken状态成功 if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 清除唤醒标志位 new &^= mutexWoken }

走到这里有两种情况:1. 自旋超过了次数;2. 目前锁没有被持有。

所以第一个判断,如果当前加了锁,但是没有处于饥饿状态,也会重复设置new |= mutexLocked,即将mutexLocked状态设置为1;

如果是old已经是饥饿状态或者已经被上锁了,那么需要设置Waiter加一,表示这个goroutine下面不会获取锁,会等待;

如果starving为真,表示当前goroutine是饥饿状态,并且old已经被上锁了,那么设置new |= mutexStarving,即将mutexStarving状态位设置为1;

awoke如果在自旋时设置成功,那么在这里要new &^= mutexWoken消除mutexWoken标志位。因为后续流程很有可能当前线程会被挂起,就需要等待其他释放锁的goroutine来唤醒,如果unlock的时候发现mutexWoken的位置不是0,则就不会去唤醒,则该线程就无法再醒来加锁。

继续往下:

if atomic.CompareAndSwapInt32(&m.state, old, new) { // 1.如果原来状态没有上锁,也没有饥饿,那么直接返回,表示获取到锁 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 2.到这里是没有获取到锁,判断一下等待时长是否不为0 // 如果不为0,那么加入到队列头部 queueLifo := waitStartTime != 0 // 3.如果等待时间为0,那么初始化等待时间 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 4.阻塞等待 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 5.唤醒之后检查锁是否应该处于饥饿状态 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 6.判断是否已经处于饥饿状态 if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 7.加锁并且将waiter数减1 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // 8.如果当前goroutine不是饥饿状态,就从饥饿模式切换会正常模式 delta -= mutexStarving } // 9.设置状态 atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state }

到这里,首先会CAS设置新的状态,如果设置成功则往下走,否则返回之后循环设置状态。设置成功之后:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxfjx.html