三.Go微服务--令牌桶实现原理 (2)

这个比较有意思的是先去计算了时间的最大值,因为初始化的时候没为 last 赋值,所以 now.Before(last) 出来的结果可能是一个很大的值,再去计算 tokens 数量很可能溢出

durationFromTokens 根据 tokens 的数量计算需要花费的时间

func (limit Limit) durationFromTokens(tokens float64) time.Duration { seconds := tokens / float64(limit) return time.Nanosecond * time.Duration(1e9*seconds) }

tokensFromDuration根据时间计算 tokens 的数量

func (limit Limit) tokensFromDuration(d time.Duration) float64 { // 这里通过拆分整数和小数部分可以减少时间上的误差 sec := float64(d/time.Second) * float64(limit) nsec := float64(d%time.Second) * float64(limit) return sec + nsec/1e9 } 2.4 消费token的总结

消费 token 的逻辑就讲完了,大概总结一下

需要消费的时候, 先去计算一下,从过去到现在可以生成多少个token

然后通过需要的 token 减去现在拥有的token数量,就得到了需要预约的token数量

再通过token数量 转换成时间,就可以得到需要等待的时间长度,以及是否可以消费

然后再通过不同的消费方式进行消费

2.5 WaitN // ctx 用于控制超时, n 是需要消费的 token 数量,如果 context 的 Deadline 早于要等待的时间就会直接返回失败 func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { lim.mu.Lock() burst := lim.burst limit := lim.limit lim.mu.Unlock() // 先看一下是不是已经超出消费极限了 if n > burst && limit != Inf { return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst) } // 如果 ctx 已经结束了也不用等了 select { case <-ctx.Done(): return ctx.Err() default: } // 计算一下可以等待的时间 now := time.Now() waitLimit := InfDuration if deadline, ok := ctx.Deadline(); ok { waitLimit = deadline.Sub(now) } // 调用 reserveN 得到预约数据 r := lim.reserveN(now, n, waitLimit) // 如果不 ok 说明预约不到 if !r.ok { return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) } // 如果可以预约到,计算一下需要等多久 delay := r.DelayFrom(now) if delay == 0 { return nil } // 启动一个 timer 进行定时 t := time.NewTimer(delay) defer t.Stop() select { case <-t.C: // We can proceed. return nil case <-ctx.Done(): // 如果 context 主动取消了,那么之前预约的 token 数量需要归还 r.Cancel() return ctx.Err() } } 2.5 取消消费

WaitN 当中如果预约上了,但是 Context 取消了,会调用 CancelAt 归还 tokens, 实现原理如下

func (r *Reservation) CancelAt(now time.Time) { // 不 ok 说明没有预约上,直接返回就行了 if !r.ok { return } r.lim.mu.Lock() defer r.lim.mu.Unlock() // 如果没有速率限制,或者没有消费 token 或 token 已经被消费了,都不用还了 if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { return } // 计算需要还的 token 数量 // 这里说是需要减去已经预支的 token 数量,但是我发现应该是个 bug,感觉这里减重复了 restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) if restoreTokens <= 0 { return } // 计算当前拥有的 tokens 数量 now, _, tokens := r.lim.advance(now) // 当前拥有的加上需要归还的就是现有的,但是不能大于桶的容量 tokens += restoreTokens if burst := float64(r.lim.burst); tokens > burst { tokens = burst } // 更新 tokens 数量 r.lim.last = now r.lim.tokens = tokens // 如果相等说明后面没有新的 token 消费,所以将状态重置到上一次 if r.timeToAct == r.lim.lastEvent { prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) if !prevEvent.Before(now) { r.lim.lastEvent = prevEvent } } return } 3. 存在的问题

除了上面提到的感觉 cancelAt 可能有一个 bug 外,云神的博客还提到了一个问题,就是如果我们 cancel 了的话,后面已经在等待的任务是不会重新调整的,举个例子

func wait() { l := rate.NewLimiter(10, 10) t := time.Now() l.ReserveN(t, 10) var wg sync.WaitGroup ctx, cancel := context.WithTimeout(context.TODO(), time.Hour) defer cancel() // 注释掉下面这段就不会提前 cancel wg.Add(1) go func() { defer wg.Done() // 模拟出现问题, 200ms就取消了 time.Sleep(200 * time.Millisecond) cancel() }() wg.Add(2) go func() { defer wg.Done() // 如果要等,这个要等 1s 才能执行,但是我们的 ctx 200ms 就会取消 l.WaitN(ctx, 10) fmt.Printf("[1] cost: %s\n", time.Since(t)) }() time.Sleep(100 * time.Millisecond) go func() { defer wg.Done() // 正常情况下,这个要等 1.2 s 才能执行,但是我们前面都取消了 // 这个是不是应该就只需要等 200ms 就执行了 ctx, cancel := context.WithTimeout(context.Background(), time.Hour) defer cancel() l.WaitN(ctx, 2) fmt.Printf("[2] cost: %s\n", time.Since(t)) }() wg.Wait() }

先看一下不提前 cancel 的结果

[1] cost: 1.0002113s [2] cost: 1.2007347s

再看看提前 cancel 的结果

[1] cost: 200.8268ms [2] cost: 1.201066s

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwgjgp.html