从源码剖析Go语言基于信号抢占式调度 (3)

asyncPreempt2

func asyncPreempt2() { gp := getg() gp.asyncSafePoint = true // 该 G 是否可以被抢占 if gp.preemptStop { mcall(preemptPark) } else { // 让 G 放弃当前在 M 上的执行权利,将 G 放入全局队列等待后续调度 mcall(gopreempt_m) } gp.asyncSafePoint = false }

该函数会获取当前 G ,然后判断 G 的 preemptStop 值,preemptStop 会在调用 runtime/preempt.go的 suspendG 函数的时候将 _Grunning 状态的 Goroutine 标记成可以被抢占 gp.preemptStop = true,表示该 G 可以被抢占。

下面我们看一下执行抢占任务会调用的 runtime/proc.go的 preemptPark函数:

preemptPark

func preemptPark(gp *g) { status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } gp.waitreason = waitReasonPreempted casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted) // 使当前 m 放弃 g,让出线程 dropg() // 修改当前 Goroutine 的状态到 _Gpreempted casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted) // 并继续执行调度 schedule() }

preemptPark 会修改当前 Goroutine 的状态到 _Gpreempted ,调用 dropg 让出线程,最后调用 schedule 函数继续执行其他 Goroutine 的任务循环调度。

gopreempt_m

gopreempt_m 方法比起抢占更像是主动让权,然后重新加入到执行队列中等待调度。

func gopreempt_m(gp *g) { goschedImpl(gp) } func goschedImpl(gp *g) { status := readgstatus(gp) ... // 更新状态为 _Grunnable casgstatus(gp, _Grunning, _Grunnable) // 使当前 m 放弃 g,让出线程 dropg() lock(&sched.lock) // 重新加入到全局执行队列中 globrunqput(gp) unlock(&sched.lock) // 并继续执行调度 schedule() } 抢占信号发送

抢占信号的发送是由 preemptM 进行的。

这个函数在runtime/signal_unix.go文件中:

preemptM

const sigPreempt = _SIGURG func preemptM(mp *m) { ... if atomic.Cas(&mp.signalPending, 0, 1) { // preemptM 向 M 发送抢占请求。 // 接收到该请求后,如果正在运行的 G 或 P 被标记为抢占,并且 Goroutine 处于异步安全点, // 它将抢占 Goroutine。 signalM(mp, sigPreempt) } }

preemptM 这个函数会调用 signalM 将在初始化的安装的 _SIGURG 信号发送到指定的 M 上。

使用 preemptM 发送抢占信号的地方主要有下面几个:

Go 后台监控 runtime.sysmon 检测超时发送抢占信号;

Go GC 栈扫描发送抢占信号;

Go GC STW 的时候调用 preemptall 抢占所有 P,让其暂停;

Go 后台监控执行抢占

系统监控 runtime.sysmon 会在循环中调用 runtime.retake抢占处于运行或者系统调用中的处理器,该函数会遍历运行时的全局处理器。

系统监控通过在循环中抢占主要是为了避免 G 占用 M 的时间过长造成饥饿。

runtime.retake主要分为两部分:

调用 preemptone 抢占当前处理器;

调用 handoffp 让出处理器的使用权;

抢占当前处理器

func retake(now int64) uint32 { n := 0 lock(&allpLock) // 遍历 allp 数组 for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { continue } pd := &_p_.sysmontick s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // 调度次数 t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) // 处理器上次调度时间 pd.schedwhen = now // 抢占 G 的执行,如果上一次触发调度的时间已经过去了 10ms } else if pd.schedwhen+forcePreemptNS <= now { preemptone(_p_) sysretake = true } } ... } unlock(&allpLock) return uint32(n) }

这一过程会获取当前 P 的状态,如果处于 _Prunning 或者 _Psyscall 状态时,并且上一次触发调度的时间已经过去了 10ms,那么会调用 preemptone 进行抢占信号的发送,preemptone 在上面我们已经讲过了,这里就不再复述。

sysmon_preempt

调用 handoffp 让出处理器的使用权

func retake(now int64) uint32 { n := 0 lock(&allpLock) // 遍历 allp 数组 for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { continue } pd := &_p_.sysmontick s := _p_.status sysretake := false ... if s == _Psyscall { // 系统调用的次数 t := int64(_p_.syscalltick) if !sysretake && int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) // 系统调用的时间 pd.syscallwhen = now continue } if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } unlock(&allpLock) incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { n++ _p_.syscalltick++ // 让出处理器的使用权 handoffp(_p_) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) return uint32(n) }

这一过程会判断 P 的状态如果处于 _Psyscall 状态时,会进行一个判断,有一个不满足则调用 handoffp 让出 P 的使用权:

runqempty(_p_) :判断 P 的任务队列是否为空;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsswsx.html