cleantimers 函数中使用了一个无限循环来获取头节点。如果头节点的状态是 timerDeleted ,那么需要从 timer 列表中删除;如果头节点的状态是 timerModifiedEarlier 或 timerModifiedLater ,表示头节点的触发的时间被修改到了更早或更晚的时间,那么就先从 timer队列中删除再重新添加。
runtime.doaddtimer
func doaddtimer(pp *p, t *timer) { // Timers 依赖于 netpoller // 所以如果 netpoller 没有启动,需要启动一下 if netpollInited == 0 { netpollGenericInit() } // 校验是否早已在 timer 列表中 if t.pp != 0 { throw("doaddtimer: P already set in timer") } // 设置 timer 与 P 的关联 t.pp.set(pp) i := len(pp.timers) // 将 timer 加入到 P 的 timer 列表中 pp.timers = append(pp.timers, t) // 维护 timer 在 最小堆中的位置 siftupTimer(pp.timers, i) // 如果 timer 是列表中头节点,需要设置一下 timer0When if t == pp.timers[0] { atomic.Store64(&pp.timer0When, uint64(t.when)) } atomic.Xadd(&pp.numTimers, 1) }doaddtimer 函数实际上很简单,主要是将 timer 与 P 设置关联关系,并将 timer 加入到 P 的 timer 列表中,并维护 timer 列表最小堆的顺序。
runtime.wakeNetPoller
func wakeNetPoller(when int64) { if atomic.Load64(&sched.lastpoll) == 0 { pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) // 如果计时器的触发时间小于netpoller的下一次轮询时间 if pollerPollUntil == 0 || pollerPollUntil > when { // 向netpollBreakWr里面写入数据,立即中断netpoll netpollBreak() } } } func netpollBreak() { if atomic.Cas(&netpollWakeSig, 0, 1) { for { var b byte // 向 netpollBreakWr 里面写入数据 n := write(netpollBreakWr, unsafe.Pointer(&b), 1) if n == 1 { break } if n == -_EINTR { continue } if n == -_EAGAIN { return } println("runtime: netpollBreak write failed with", -n) throw("runtime: netpollBreak write failed") } } }wakeNetPoller 主要是将 timer 下次调度的时间和 netpoller 的下一次轮询时间相比,如果小于的话,调用 netpollBreak 向 netpollBreakWr 里面写入数据,立即中断netpoll。具体如何中断的,我们下面再聊。
stopTimer 终止 timer终止 timer 的逻辑主要是 timer 的状态的变更:
如果该timer处于 timerWaiting 或 timerModifiedLater 或 timerModifiedEarlier:
timerModifying -> timerDeleted
如果该timer处于 其他状态:
待状态改变或者直接返回
所以在终止 timer 的过程中不会去删除 timer,而是标记一个状态,等待被删除。
modTimer 修改 timer func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool { if when < 0 { when = maxWhen } status := uint32(timerNoStatus) wasRemoved := false var pending bool var mp *m loop: for { // 修改 timer 状态 switch status = atomic.Load(&t.status); status { ... } t.period = period t.f = f t.arg = arg t.seq = seq // 如果 timer 已被删除,那么需要重新添加到 timer 列表中 if wasRemoved { t.when = when pp := getg().m.p.ptr() lock(&pp.timersLock) doaddtimer(pp, t) unlock(&pp.timersLock) if !atomic.Cas(&t.status, timerModifying, timerWaiting) { badTimer() } releasem(mp) wakeNetPoller(when) } else { t.nextwhen = when newStatus := uint32(timerModifiedLater) // 如果修改后的时间小于修改前的时间,将状态设置为 timerModifiedEarlier if when < t.when { newStatus = timerModifiedEarlier } ... if !atomic.Cas(&t.status, timerModifying, newStatus) { badTimer() } releasem(mp) // 如果修改时间提前,那么触发 netpoll 中断 if newStatus == timerModifiedEarlier { wakeNetPoller(when) } } return pending }modtimer 进入到 for 循环后会根据不同的状态做状态设置以及必要字段的处理;如果是 timer 已被删除,那么需要重新添加到 timer 列表中;如果 timer 修改后的时间小于修改前的时间,将状态设置为 timerModifiedEarlier,修改时间提前,还需要触发 netpoll 中断。
timer 的运行聊完了如何添加 timer,下面我们来看看 timer 是如何运行的。timer 的运行是交给 runtime.runtimer函数执行的,这个函数会检查 P 上最小堆的最顶上的 timer 的状态,根据状态做不同的处理。
func runtimer(pp *p, now int64) int64 { for { // 获取最小堆的第一个元素 t := pp.timers[0] if t.pp.ptr() != pp { throw("runtimer: bad p") } // 获取 timer 状态 switch s := atomic.Load(&t.status); s { // timerWaiting case timerWaiting: // 还没到时间,返回下次执行时间 if t.when > now { // Not ready to run. return t.when } // 修改状态为 timerRunning if !atomic.Cas(&t.status, s, timerRunning) { continue } // 运行该 timer runOneTimer(pp, t, now) return 0 // timerDeleted case timerDeleted: if !atomic.Cas(&t.status, s, timerRemoving) { continue } // 删除最小堆的第一个 timer dodeltimer0(pp) if !atomic.Cas(&t.status, timerRemoving, timerRemoved) { badTimer() } atomic.Xadd(&pp.deletedTimers, -1) if len(pp.timers) == 0 { return -1 } // 需要重新移动位置的 timer case timerModifiedEarlier, timerModifiedLater: if !atomic.Cas(&t.status, s, timerMoving) { continue } t.when = t.nextwhen // 删除最小堆的第一个 timer dodeltimer0(pp) // 将该 timer 重新添加到最小堆 doaddtimer(pp, t) if s == timerModifiedEarlier { atomic.Xadd(&pp.adjustTimers, -1) } if !atomic.Cas(&t.status, timerMoving, timerWaiting) { badTimer() } case timerModifying: osyield() case timerNoStatus, timerRemoved: badTimer() case timerRunning, timerRemoving, timerMoving: badTimer() default: badTimer() } } }runtimer 里面会启动一个 for 循环,不停的检查 P 的 timer 列表的第一个元素的状态。
如果该 timer 处于 timerWaiting,那么判断当前的时间大于 timer 执行的时间,则调用 runOneTimer 执行;
如果该 timer 处于 timerDeleted,表示该 timer 是需要被删除的,那么调用 dodeltimer0 删除最小堆的第一个 timer ,并修改其状态;