接着,回到TimeWindowRegulatorListener#onAddInvocationStat中,会往下调用startRegulate方法。
/**
* 度量线程池
*/
private final ScheduledService
measureScheduler = new ScheduledService("AFT-MEASURE",
ScheduledService.MODE_FIXEDRATE,
new MeasureRunnable(), 1, 1,
TimeUnit.SECONDS);
public void startRegulate() {
if (measureStarted.compareAndSet(false, true)) {
measureScheduler.start();
}
}
ScheduledService是一个线程池,measureScheduler变量实例化了一个固定频率执行延迟线程池,会每1秒钟固定调用MeasureRunnable的run方法。
MeasureRunnable是TimeWindowRegulator的内部类:
private class MeasureRunnable implements Runnable {
@Override
public void run() {
measureCounter.incrementAndGet();
//遍历TimeWindowRegulatorListener加入的MeasureModel实例
for (MeasureModel measureModel : measureModels) {
try {
//时间窗口是10,也就是说默认每过10秒才能进入下面的方法。
if (isArriveTimeWindow(measureModel)) {
//ServiceHorizontalMeasureStrategy
MeasureResult measureResult = measureStrategy.measure(measureModel);
regulationExecutor.submit(new RegulationRunnable(measureResult));
}
} catch (Exception e) {
LOGGER.errorWithApp(measureModel.getAppName(), "Error when doMeasure: " + e.getMessage(), e);
}
}
}
private boolean isArriveTimeWindow(MeasureModel measureModel) {
//timeWindow默认是10
long timeWindow = FaultToleranceConfigManager.getTimeWindow(measureModel.getAppName());
return measureCounter.get() % timeWindow == 0;
}
}
我们先来到ServiceHorizontalMeasureStrategy#measure来看看是怎么判断为异常或正常
如何判断一个节点是异常还是正常
我们首先不看代码的实现,先白话的说明一下是如何实现的。
首先在FaultToleranceSubscriber#onEvent中收到同步或异步结果事件后,就会从工厂中获取这次调用的 InvokeStat(如果 InvokeStat 已经存在则直接返回,如果没有则创建新的并保持到缓存中)。通过调用 InvokeStat 的 invoke 和 catchException 方法统计调用次数和异常次数。
然后在MeasureRunnable方法中根据设置的窗口期,在到达窗口期的时候会从 MeasueModel 的各个 InvokeStat 创建一份镜像数据,表示当前串口内的调用情况。
对所有的节点进行度量,计算出所有节点的平均异常率,如果某个节点的异常率大于平均异常率到一定比例,则判定为异常。
我这里选用官方的例子来进行说明:
假如有三个节点,提供同一服务,调用次数和异常数如表格所示:
invokeCount
expCount
invokeStat 1
5
4
invokeStat 2
10
1
invokeStat 3
10
0
结合上述例子,度量策略的大致逻辑如下:
首先统计该服务下所有 ip 的平均异常率,并用 averageExceptionRate 表示。平均异常率比较好理解,即异常总数 / 总调用次数,上例中 averageExceptionRate =(1 + 4) / (5 + 10 + 10) = 0.2.
当某个ip的窗口调用次数小于该服务的最小窗口调用次数( leastWindCount )则忽略并将状态设置为 IGNOGRE。否则进行降级和恢复度量。 如 invokeStat 1 的 invokeCount 为5,如果 leastWindCount 设置为6 则 invokeStat 1 会被忽略。
当某个ip的 时间窗口内的异常率和服务平均异常比例 windowExceptionRate 大于 配置的 leastWindowExceptionRateMultiplte (最小时间窗口内异常率和服务平均异常率的降级比值),那么将该IP设置为 ABNORMAL, 否则设置为 HEALTH.
windowExceptionRate 是异常率和服务平均异常比例,invokeStat 1 的异常率为 4/5 = 0.8, 则其对应的 windowExceptionRate = 0.8 / 0.2 = 4. 假设 leastWindowExceptionRateMultiplte =4, 那么 invokeStat 1 是一次服务,则需要进行降级操作。
接下来我们来看具体的源码实现:
ServiceHorizontalMeasureStrategy#measure
public MeasureResult measure(MeasureModel measureModel) {
MeasureResult measureResult = new MeasureResult();
measureResult.setMeasureModel(measureModel);
String appName = measureModel.getAppName();
List<InvocationStat> stats = measureModel.getInvocationStats();
if (!CommonUtils.isNotEmpty(stats)) {
return measureResult;
}
//1
//这个方法主要是复制出一个当前时间点的调用情况,只统计被复制的InvocationStat
//如果有被新剔除的InvocationStat,则不会存在于该次获取结果中。
List<InvocationStat> invocationStats = getInvocationStatSnapshots(stats);
//FaultToleranceConfig的timeWindow所设置的,时间窗口,默认是10
long timeWindow = FaultToleranceConfigManager.getTimeWindow(appName);
/* leastWindowCount在同一次度量中保持不变*/
//默认InvocationStat如果要参与统计的窗口内最低调用次数,时间窗口内,至少调用的次数.在时间窗口内总共都不足10,认为不需要调控.
long leastWindowCount = FaultToleranceConfigManager.getLeastWindowCount(appName);
//最小是1,也就是时间窗口内,只要调用了就进行统计
leastWindowCount = leastWindowCount < LEGAL_LEAST_WINDOW_COUNT ? LEGAL_LEAST_WINDOW_COUNT
: leastWindowCount;
//2.
/* 计算平均异常率和度量单个ip的时候都需要使用到appWeight*/
double averageExceptionRate = calculateAverageExceptionRate(invocationStats, leastWindowCount);
//表示当前机器是平均异常率的多少倍才降级,默认是6
double leastWindowExceptionRateMultiple = FaultToleranceConfigManager
.getLeastWindowExceptionRateMultiple(appName);
for (InvocationStat invocationStat : invocationStats) {
MeasureResultDetail measureResultDetail = null;
InvocationStatDimension statDimension = invocationStat.getDimension();
long windowCount = invocationStat.getInvokeCount();
//3
//这里主要是根据Invocation的实际权重计算该Invocation的实际最小窗口调用次数
long invocationLeastWindowCount = getInvocationLeastWindowCount(invocationStat,
ProviderInfoWeightManager.getWeight(statDimension.getProviderInfo()),
leastWindowCount);
//4
//当总调用的次数为0的时候,averageExceptionRate =-1,这个时候可以设置为忽略
if (averageExceptionRate == -1) {
measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.IGNORE);
} else {
if (invocationLeastWindowCount != -1 && windowCount >= invocationLeastWindowCount) {
//获取异常率
double windowExceptionRate = invocationStat.getExceptionRate();
//没有异常的情况,设置状态为健康
if (averageExceptionRate == 0) {
measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.HEALTH);
} else {
//5
//这里主要是看这次被遍历到invocationStat的异常率和平均异常率之比
double windowExceptionRateMultiple = CalculateUtils.divide(
windowExceptionRate, averageExceptionRate);
//如果当前的invocationStat的异常是平均异常的6倍,那么就设置状态为异常
measureResultDetail = windowExceptionRateMultiple >= leastWindowExceptionRateMultiple ?
new MeasureResultDetail(statDimension, MeasureState.ABNORMAL) :
new MeasureResultDetail(statDimension, MeasureState.HEALTH);
}
measureResultDetail.setAbnormalRate(windowExceptionRate);
measureResultDetail.setAverageAbnormalRate(averageExceptionRate);
measureResultDetail.setLeastAbnormalRateMultiple(leastWindowExceptionRateMultiple);
} else {
measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.IGNORE);
}
}
measureResultDetail.setWindowCount(windowCount);
measureResultDetail.setTimeWindow(timeWindow);
measureResultDetail.setLeastWindowCount(invocationLeastWindowCount);
measureResult.addMeasureDetail(measureResultDetail);
}
//打日志
logMeasureResult(measureResult, timeWindow, leastWindowCount, averageExceptionRate,
leastWindowExceptionRateMultiple);
InvocationStatFactory.updateInvocationStats(invocationStats);
return measureResult;
}
上面这个方法有点长,我给这个方法标注了数字,跟着数字标记去看。