常用限流算法与Guava RateLimiter源码解析 (2)

1.关键属性

/** 桶中当前拥有的令牌数. */ double storedPermits; /** 桶中最多可以保存多少秒存入的令牌数 */ double maxBurstSeconds; /** 桶中能存储的最大令牌数,等于storedPermits*maxBurstSeconds. */ double maxPermits; /** 放入令牌的时间间隔*/ double stableIntervalMicros; /** 下次可获取令牌的时间点,可以是过去也可以是将来的时间点*/ private long nextFreeTicketMicros = 0L;

2.关键方法

调用 RateLimiter.create(double permitsPerSecond) 方法时,创建的是 SmoothBursty 实例,默认设置 maxBurstSeconds 为1s。SleepingStopwatch 是guava中的一个时钟类实现。

@VisibleForTesting static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; }

并通过调用 SmoothBursty.doSetRate(double, long) 方法进行初始化,该方法中:

调用 resync(nowMicros) 对 storedPermits 与 nextFreeTicketMicros 进行了调整——如果当前时间晚于 nextFreeTicketMicros,则计算这段时间内产生的令牌数,累加到 storedPermits 上,并更新下次可获取令牌时间 nextFreeTicketMicros 为当前时间。

计算 stableIntervalMicros 的值,1/permitsPerSecond。

调用 doSetRate(double, double) 方法计算 maxPermits 值(maxBurstSeconds*permitsPerSecond),并根据旧的 maxPermits 值对 storedPermits 进行调整。

源码如下所示

@Override final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } /** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */ void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } }

调用 acquire(int) 方法获取指定数量的令牌时,

调用 reserve(int) 方法,该方法最终调用 reserveEarliestAvailable(int, long) 来更新下次可取令牌时间点与当前存储的令牌数,并返回本次可取令牌的时间点,根据该时间点计算需要等待的时间

阻塞等待1中返回的等待时间

返回等待的时间(秒)

源码如下所示

/** 获取指定数量(permits)的令牌,阻塞直到获取到令牌,返回等待的时间*/ @CanIgnoreReturnValue public double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } /** 返回需要等待的时间*/ final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } /** 针对此次需要获取的令牌数更新下次可取令牌时间点与存储的令牌数,返回本次可取令牌的时间点*/ @Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); // 更新当前数据 long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 本次可消费的令牌数 double freshPermits = requiredPermits - storedPermitsToSpend; // 需要新增的令牌数 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 需要等待的时间 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 更新下次可取令牌的时间点 this.storedPermits -= storedPermitsToSpend; // 更新当前存储的令牌数 return returnValue; }

acquire(int) 方法是获取不到令牌时一直阻塞,直到获取到令牌,tryAcquire(int,long,TimeUnit) 方法则是在指定超时时间内尝试获取令牌,如果获取到或超时时间到则返回是否获取成功

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzjpdf.html