3. Sentinel源码分析— QPS流量控制是如何实现的? (3)

这里我拿一张图来说明一下:

3. Sentinel源码分析— QPS流量控制是如何实现的?

X 轴代表 storedPermits 的数量,Y 轴代表获取一个 permits 需要的时间。

假设指定 permitsPerSecond 为 10,那么 stableInterval 为 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍 )。也就是说,当达到 maxPermits 时,此时处于系统最冷的时候,获取一个 permit 需要 300ms,而如果 storedPermits 小于 thresholdPermits 的时候,只需要 100ms。

利用 “获取冷的 permits ” 需要等待更多时间,来限制突发请求通过,达到系统预热的目的。

所以在我们的代码中,maxToken代表的就是图中的maxPermits,warningToken代表的就是thresholdPermits,slope就是代表每次获取permit减少的程度。

我们接下来看看WarmUpController的canpass方法:

WarmUpController#canpass

public boolean canPass(Node node, int acquireCount, boolean prioritized) { //获取当前时间窗口的流量大小 long passQps = (long) node.passQps(); //获取上一个窗口的流量大小 long previousQps = (long) node.previousPassQps(); //设置 storedTokens 和 lastFilledTime 到正确的值 syncToken(previousQps); // 开始计算它的斜率 // 如果进入了警戒线,开始调整他的qps long restToken = storedTokens.get(); if (restToken >= warningToken) { //通过计算当前的restToken和警戒线的距离来计算当前的QPS //离警戒线越接近,代表这个程序越“热”,从而逐步释放QPS long aboveToken = restToken - warningToken; //当前状态下能达到的最高 QPS // current interval = restToken*slope+1/count double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); // 如果不会超过,那么通过,否则不通过 if (passQps + acquireCount <= warningQps) { return true; } } else { // count 是最高能达到的 QPS if (passQps + acquireCount <= count) { return true; } } return false; }

这个方法里通过syncToken(previousQps)设置storedTokens的值后,与警戒值做判断,如果没有达到警戒值,那么通过计算和警戒值的距离再加上slope计算出一个当前的QPS值,storedTokens越大当前的QPS越小。

如果当前的storedTokens已经小于警戒值了,说明已经预热完毕了,直接用count判断就好了。

WarmUpController#syncToken

protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); //去掉毫秒的时间 currentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime) { return; } // 令牌数量的旧值 long oldValue = storedTokens.get(); // 计算新的令牌数量,往下看 long newValue = coolDownTokens(currentTime, passQps); if (storedTokens.compareAndSet(oldValue, newValue)) { // 令牌数量上,减去上一分钟的 QPS,然后设置新值 long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); } }

这个方法通过coolDownTokens方法来获取一个新的value,然后通过CAS设置到storedTokens中,然后将storedTokens减去上一个窗口的QPS值,并为lastFilledTime设置一个新的值。

其实我这里有个疑惑,在用storedTokens减去上一个窗口的QPS的时候并没有做控制,假如处理的速度非常的快,在一个窗口内就减了很多次,直接把当前的storedTokens减到了小于warningToken,那么是不是就没有在一定的时间范围内启动冷启动的效果?

private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; // 添加令牌的判断前提条件: // 当令牌的消耗程度远远低于警戒线的时候 if (oldValue < warningToken) { // 根据count数每秒加上令牌 newValue = (long) (oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { //如果还在冷启动阶段 // 如果当前通过的 QPS 大于 count/coldFactor,说明系统消耗令牌的速度,大于冷却速度 // 那么不需要添加令牌,否则需要添加令牌 if (passQps < (int) count / coldFactor) { newValue = (long) (oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.min(newValue, maxToken); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyywfs.html