schedule重新调度
func newstack() { thisg := getg() ...... gp := thisg.m.curg preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt ...... if preempt { ...... // Act like goroutine called runtime.Gosched. gopreempt_m(gp) // never return } } func gopreempt_m(gp *g) { if trace.enabled { traceGoPreempt() } goschedImpl(gp) } func goschedImpl(gp *g) { status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } casgstatus(gp, _Grunning, _Grunnable) // 状态从 _Grunning 改为 _Grunnable。你运行的太久了,下来吧你。 dropg() // 解绑G和M lock(&sched.lock) globrunqput(gp) // 放入全局队列 unlock(&sched.lock) schedule() // 重新调度,进入调度循环。 } 基于信号的异步抢占上述的 preemptone函数会调用preemptM函数,并且最终会调用tgkill系统调用,向需要被抢占的G所在的工作线程发送 _SIGURG 信号。
发送信号 func preemptM(mp *m) { ...... signalM(mp, sigPreempt) } // signalM sends a signal to mp. func signalM(mp *m, sig int) { tgkill(getpid(), int(mp.procid), sig) } TEXT ·tgkill(SB),NOSPLIT,$0 MOVQ tgid+0(FP), DI MOVQ tid+8(FP), SI MOVQ sig+16(FP), DX MOVL $SYS_tgkill, AX SYSCALL RET 执行抢占内核在收到 _SIGURG 信号后,会调用该线程注册的信号处理程序,最终会执行到以下程序。
因为注册逻辑不是问题的关注核心,所以就放在后面有介绍。
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) { _g_ := getg() c := &sigctxt{info, ctxt} ...... if sig == sigPreempt { // const sigPreempt doSigPreempt(gp, c) } ...... } func doSigPreempt(gp *g, ctxt *sigctxt) { // Check if this G wants to be preempted and is safe to // preempt. if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) { // Inject a call to asyncPreempt. ctxt.pushCall(funcPC(asyncPreempt)) } // Acknowledge the preemption. atomic.Xadd(&gp.m.preemptGen, 1) } func (c *sigctxt) pushCall(targetPC uintptr) { // Make it look like the signaled instruction called target. pc := uintptr(c.rip()) sp := uintptr(c.rsp()) sp -= sys.PtrSize *(*uintptr)(unsafe.Pointer(sp)) = pc c.set_rsp(uint64(sp)) c.set_rip(uint64(targetPC)) // pc指向asyncPreempt } // asyncPreempt->asyncPreempt2 func asyncPreempt2() { gp := getg() gp.asyncSafePoint = true if gp.preemptStop { mcall(preemptPark) } else { mcall(gopreempt_m) } gp.asyncSafePoint = false } // gopreempt_m里调用了goschedImpl,这个函数上面分析过,是完成抢占的关键。此时也就是完成了抢占,进入调度循环。 func gopreempt_m(gp *g) { if trace.enabled { traceGoPreempt() } goschedImpl(gp) } 信号处理程序的注册与执行 注册m0的信号处理程序是在整个程序一开始就在 mstart1 中开始注册的。
而其他M所属线程因为在clone的时候指定了 _CLONE_SIGHAND 标记,共享了信号handler table。所以一出生就有了。
注册逻辑如下:
// 省略了一些无关代码 func mstart1() { if _g_.m == &m0 { mstartm0() } } func mstartm0() { initsig(false) } // 循环注册信号处理程序 func initsig(preinit bool) { for i := uint32(0); i < _NSIG; i++ { ...... setsig(i, funcPC(sighandler)) } } // sigtramp注册为处理程序 func setsig(i uint32, fn uintptr) { var sa sigactiont sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTORER | _SA_RESTART sigfillset(&sa.sa_mask) if GOARCH == "386" || GOARCH == "amd64" { sa.sa_restorer = funcPC(sigreturn) } if fn == funcPC(sighandler) { if iscgo { fn = funcPC(cgoSigtramp) } else { fn = funcPC(sigtramp) } } sa.sa_handler = fn sigaction(i, &sa, nil) } // sigaction->sysSigaction->rt_sigaction // 调用rt_sigaction系统调用,注册处理程序 TEXT runtime·rt_sigaction(SB),NOSPLIT,$0-36 MOVQ sig+0(FP), DI MOVQ new+8(FP), SI MOVQ old+16(FP), DX MOVQ size+24(FP), R10 MOVL $SYS_rt_sigaction, AX SYSCALL MOVL AX, ret+32(FP) RET以上逻辑主要作用就是循环注册 _NSIG(65) 个信号处理程序,其实都是 sigtramp 函数。操作系统内核在收到信号后会调用此函数。
执行sigtramp是入口,sighandler根据不同信号调用处理程序。
TEXT runtime·sigtramp(SB),NOSPLIT,$72 ...... MOVQ DX, ctx-56(SP) MOVQ SI, info-64(SP) MOVQ DI, signum-72(SP) MOVQ $runtime·sigtrampgo(SB), AX CALL AX ...... RET func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) { ...... c := &sigctxt{info, ctx} g := sigFetchG(c) // getg() ...... sighandler(sig, info, ctx, g) ...... }