Go中定时器实现原理及源码解析 (5)

检查是否有需要进行调整的 timer, 如果没有需要执行的计时器时,直接返回;如果下一个要执行的 timer 没有到期并且需要删除的计时器较少(四分之一)时也会直接返回;

调用 adjusttimers 进行 timer 列表的调整,主要是维护 timer 列表的最小堆的顺序;

调用 runtime.runtimer查找堆中是否存在需要执行的timer, runtime.runtimer上面已经讲过了,这里不再赘述;

如果当前 Goroutine 的 P 和传入的 P 相同,并且需要删除的 timer 超过了 timer 列表数量的四分之一,那么调用 clearDeletedTimers 清理需要删除的 timer;

runtime.findrunnable

func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() ... // 检查 P 中可执行的 timer now, pollUntil, _ := checkTimers(_p_, 0) ... // 如果 netpoll 已被初始化,并且 Waiters 大于零,并且 lastpoll 不为0 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { // 尝试从netpoller获取Glist if list := netpoll(0); !list.empty() { // 无阻塞 gp := list.pop() //将其余队列放入 P 的可运行G队列 injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } ... // 开始窃取 for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } // 如果 i>2 表示如果其他 P 运行队列中没有 G ,将要从其他队列的 runnext 中获取 stealRunNextG := i > 2 // first look for ready queues with more than 1 g // 随机获取一个 P p2 := allp[enum.position()] if _p_ == p2 { continue } // 从其他 P 的运行队列中获取一般的 G 到当前队列中 if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil { return gp, false } // 如果运行队列中没有 G,那么从 timers 中获取可执行的 timer if i > 2 || (i > 1 && shouldStealTimers(p2)) { // ran 为 true 表示有执行过 timer tnow, w, ran := checkTimers(p2, now) now = tnow if w != 0 && (pollUntil == 0 || w < pollUntil) { pollUntil = w } if ran { // 因为已经运行过 timer 了,说不定已经有准备就绪的 G 了 // 再次检查本地队列尝试获取 G if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } ranTimer = true } } } } if ranTimer { // 执行完一个 timer 后可能存在已经就绪的 G goto top } stop: ... delta := int64(-1) if pollUntil != 0 { // checkTimers ensures that polluntil > now. delta = pollUntil - now } ... // poll network // 休眠前再次检查 poll 网络 if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { ... list := netpoll(delta) // 阻塞调用 lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ == nil { injectglist(&list) } else { acquirep(_p_) if !list.empty() { gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } goto top } } else if pollUntil != 0 && netpollinited() { pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) if pollerPollUntil == 0 || pollerPollUntil > pollUntil { netpollBreak() } } // 休眠当前 M stopm() goto top }

findrunnable 我在这篇文章 https://www.luozhiyun.com/archives/448 已经讲的很清楚了,这里提取 timer 相关的代码分析一下:

findrunnable 在窃取前先会调用 checkTimers 检查 P 中可执行的 timer;

如果 netpoll 中有等待的 waiter,那么会调用 netpoll 尝试无阻塞的从netpoller获取Glist;

如果获取不到可执行的 G,那么就会开始执行窃取。窃取的时候会调用 checkTimers 随机从其他的 P 中获取 timer;

窃取完毕后也没有可执行的 timer,那么会继续往下,休眠前再次检查 netpoll 网络,调用 netpoll(delta) 函数进行阻塞调用。

系统监控触发

系统监控其实就是 Go 语言的守护进程,它们能够在后台监控系统的运行状态,在出现意外情况时及时响应。它会每隔一段时间检查 Go 语言运行时状态,确保没有异常发生。我们这里不主要去讲系统监控,只抽离出其中的和 timer 相关的代码进行讲解。

runtime.sysmon

func sysmon() { ... for { ... now := nanotime() // 返回下次需要调度 timer 到期时间 next, _ := timeSleepUntil() ... // 如果超过 10ms 没有 poll,则 poll 一下网络 lastpoll := int64(atomic.Load64(&sched.lastpoll)) if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) list := netpoll(0) // 非阻塞,返回 G 列表 // G 列表不为空 if !list.empty() { incidlelocked(-1) // 将获取到的 G 列表插入到空闲的 P 中或全局列表中 injectglist(&list) incidlelocked(1) } } // 如果有 timer 到期 if next < now { // 启动新的 M 处理 timer startm(nil, false) } ... } }

sysmon 会通过 timeSleepUntil 遍历所有的 P 的 timer 列表,找到下一个需要执行的 timer;

如果超过 10ms 没有 poll,则 poll 一下网络;

如果有 timer 到期,这个时候直接启动新的 M 处理 timer;

netpoll 的作用

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpddsg.html