3. Sentinel源码分析— QPS流量控制是如何实现的? (2)

它的中心思想是,以固定的间隔时间让请求通过。当请求到来的时候,如果当前请求距离上个通过的请求通过的时间间隔不小于预设值,则让当前请求通过;否则,计算当前请求的预期通过时间,如果该请求的预期通过时间小于规则预设的 timeout 时间,则该请求会等待直到预设时间到来通过(排队等待处理);若预期的通过时间超出最大排队时长,则直接拒接这个请求。

这种方式适合用于请求以突刺状来到,这个时候我们不希望一下子把所有的请求都通过,这样可能会把系统压垮;同时我们也期待系统以稳定的速度,逐步处理这些请求,以起到“削峰填谷”的效果,而不是拒绝所有请求。

要想使用这个策略需要在实例化FlowRule的时候设置rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)这样的一句代码。

在实例化Rater的时候会调用FlowRuleUtil#generateRateri创建一个实例:

new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());

MaxQueueingTimeMs默认是500 ,Count在我们这个例子中传入的是20。

我们看一下具体的canPass方法是怎么实现限流的:

public boolean canPass(Node node, int acquireCount, boolean prioritized) { // Pass when acquire count is less or equal than 0. if (acquireCount <= 0) { return true; } // Reject when count is less or equal than 0. // Otherwise,the costTime will be max of long and waitTime will overflow in some cases. if (count <= 0) { return false; } long currentTime = TimeUtil.currentTimeMillis(); //两个请求预期通过的时间,也就是说把请求平均分配到1秒上 // Calculate the interval between every two requests. long costTime = Math.round(1.0 * (acquireCount) / count * 1000); //latestPassedTime代表的是上一次调用请求的时间 // Expected pass time of this request. long expectedTime = costTime + latestPassedTime.get(); //如果预期通过的时间加上上次的请求时间小于当前时间,则通过 if (expectedTime <= currentTime) { // Contention may exist here, but it's okay. latestPassedTime.set(currentTime); return true; } else { //默认是maxQueueingTimeMs // Calculate the time to wait. long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); //如果预提时间比当前时间大maxQueueingTimeMs那么多,那么就阻塞 if (waitTime > maxQueueingTimeMs) { return false; } else { //将上次时间加上这次请求要耗费的时间 long oldTime = latestPassedTime.addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); //再次判断一下是否超过maxQueueingTimeMs设置的时间 if (waitTime > maxQueueingTimeMs) { //如果是的话就阻塞,并重置上次通过时间 latestPassedTime.addAndGet(-costTime); return false; } //如果需要等待的时间大于零,那么就sleep // in race condition waitTime may <= 0 if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; }

这个方法一开始会计算一下costTime这个值,将请求平均分配到一秒中。例如:当 count 设为 10 的时候,则代表一秒匀速的通过 10 个请求,也就是每个请求平均间隔恒定为 1000 / 10 = 100 ms。

但是这里有个小bug,如果count设置的比较大,比如设置成10000,那么costTime永远都会等于0,整个QPS限流将会失效。

然后会将costTime和上次的请求时间相加,如果大于当前时间就表明请求的太频繁了,会将latestPassedTime这个属性加上这次请求的costTime,并调用sleep方法让这个线程先睡眠一会再请求。

这里有个细节,如果多个请求同时一起过来,那么每个请求在设置oldTime的时候都会通过addAndGet这个原子性的方法将latestPassedTime依次相加,并赋值给oldTime,这样每个线程的sleep的时间都不会相同,线程也不会同时醒来。

WarmUpController限流 冷启动

当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。

//默认为3 private int coldFactor; //转折点的令牌数 protected int warningToken = 0; //最大的令牌数 private int maxToken; //斜线斜率 protected double slope; //累积的令牌数 protected AtomicLong storedTokens = new AtomicLong(0); //最后更新令牌的时间 protected AtomicLong lastFilledTime = new AtomicLong(0); public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) { construct(count, warmUpPeriodInSec, coldFactor); } private void construct(double count, int warmUpPeriodInSec, int coldFactor) { if (coldFactor <= 1) { throw new IllegalArgumentException("Cold factor should be larger than 1"); } this.count = count; //默认是3 this.coldFactor = coldFactor; // thresholdPermits = 0.5 * warmupPeriod / stableInterval. // 10*20/2 = 100 // warningToken = 100; warningToken = (int) (warmUpPeriodInSec * count) / (coldFactor - 1); // / maxPermits = thresholdPermits + 2 * warmupPeriod / // (stableInterval + coldInterval) // maxToken = 200 maxToken = warningToken + (int) (2 * warmUpPeriodInSec * count / (1.0 + coldFactor)); // slope // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits // - thresholdPermits); slope = (coldFactor - 1.0) / count / (maxToken - warningToken); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyywfs.html