调用data的currentWindow方法会调用到LeapArray的currentWindow方法中去:
LeapArray#currentWindow
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//通过当前时间判断属于哪个窗口
int idx = calculateTimeIdx(timeMillis);
//计算出窗口开始时间
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//获取数组里的老数据
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
// 如果对应时间窗口的开始时间与计算得到的开始时间一样
// 那么代表当前即是我们要找的窗口对象,直接返回
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
//如果当前的开始时间小于原开始时间,那么就更新到新的开始时间
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
//一般来说不会走到这里
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
这里我简单介绍一下这个方法,这个方法的详细讲解已经在第一篇源码分析里做了。
这个方法里面会根据当前的时间戳来计算出array数组里面的index,然后去array数组中找相应的数据,如果节点已经存在,那么用CAS更新一个新的节点;如果节点是新的,那么直接返回;如果节点失效了,设置当前节点,清除所有失效节点。
这里我直接引用1.Sentinel源码分析—FlowRuleManager加载规则做了什么?中的例子:
1. 如果array数据里面的bucket数据如下所示:
NULL
B4
|_______|_______|
800
1000 1200
^
time=888
正好当前时间所对应的槽位里面的数据是空的,那么就用CAS更新
2. 如果array里面已经有数据了,并且槽位里面的窗口开始时间和当前的开始时间相等,那么直接返回
B3
B4
||_______|_______||___
800
1000 1200 timestamp
^
time=888
3. 例如当前时间是1676,所对应窗口里面的数据的窗口开始时间小于当前的窗口开始时间,那么加上锁,然后设置槽位的窗口开始时间为当前窗口开始时间,并把槽位里面的数据重置
(old)
B0
|_______||_______|
... 1200
1400
^
time=1676
再回到ArrayMetric的success方法中,往下走调用data.values()方法:
LeapArray#success
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
这个方法就是用来获取所有有效的MetricBucket,并返回。
然后通过调用MetricBucket的success方法获取被成功调用的次数。
我们接着来看ArrayMetric的rt方法:
public long rt() {
data.currentWindow();
long rt = 0;
//获取当前时间窗口的统计数据
List<MetricBucket> list = data.values();
//统计当前时间窗口的平均相应时间之和
for (MetricBucket window : list) {
rt += window.rt();
}
return rt;
}
这个方法和上面的success方法差不多,获取所有的MetricBucket的rt数据求和返回。
然后就可以通过rt方法返回的时间总和除以成功调用的总和求得平均数。
我们再回到DegradeRule的passCheck方法中的响应时间降级策略中:
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//获取节点的平均响应时间
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
//rtSlowRequestAmount默认是5
// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
// 根据异常比例降级
}
//省略
return false;