9.源码分析---SOFARPC是如何实现故障剔除的? (5)

接着,回到TimeWindowRegulatorListener#onAddInvocationStat中,会往下调用startRegulate方法。

/** * 度量线程池 */ private final ScheduledService measureScheduler = new ScheduledService("AFT-MEASURE", ScheduledService.MODE_FIXEDRATE, new MeasureRunnable(), 1, 1, TimeUnit.SECONDS); public void startRegulate() { if (measureStarted.compareAndSet(false, true)) { measureScheduler.start(); } }

ScheduledService是一个线程池,measureScheduler变量实例化了一个固定频率执行延迟线程池,会每1秒钟固定调用MeasureRunnable的run方法。

MeasureRunnable是TimeWindowRegulator的内部类:

private class MeasureRunnable implements Runnable { @Override public void run() { measureCounter.incrementAndGet(); //遍历TimeWindowRegulatorListener加入的MeasureModel实例 for (MeasureModel measureModel : measureModels) { try { //时间窗口是10,也就是说默认每过10秒才能进入下面的方法。 if (isArriveTimeWindow(measureModel)) { //ServiceHorizontalMeasureStrategy MeasureResult measureResult = measureStrategy.measure(measureModel); regulationExecutor.submit(new RegulationRunnable(measureResult)); } } catch (Exception e) { LOGGER.errorWithApp(measureModel.getAppName(), "Error when doMeasure: " + e.getMessage(), e); } } } private boolean isArriveTimeWindow(MeasureModel measureModel) { //timeWindow默认是10 long timeWindow = FaultToleranceConfigManager.getTimeWindow(measureModel.getAppName()); return measureCounter.get() % timeWindow == 0; } }

我们先来到ServiceHorizontalMeasureStrategy#measure来看看是怎么判断为异常或正常

如何判断一个节点是异常还是正常

我们首先不看代码的实现,先白话的说明一下是如何实现的。

首先在FaultToleranceSubscriber#onEvent中收到同步或异步结果事件后,就会从工厂中获取这次调用的 InvokeStat(如果 InvokeStat 已经存在则直接返回,如果没有则创建新的并保持到缓存中)。通过调用 InvokeStat 的 invoke 和 catchException 方法统计调用次数和异常次数。

然后在MeasureRunnable方法中根据设置的窗口期,在到达窗口期的时候会从 MeasueModel 的各个 InvokeStat 创建一份镜像数据,表示当前串口内的调用情况。

对所有的节点进行度量,计算出所有节点的平均异常率,如果某个节点的异常率大于平均异常率到一定比例,则判定为异常。

我这里选用官方的例子来进行说明:
假如有三个节点,提供同一服务,调用次数和异常数如表格所示:

invokeCount expCount
invokeStat 1   5   4  
invokeStat 2   10   1  
invokeStat 3   10   0  

结合上述例子,度量策略的大致逻辑如下:

首先统计该服务下所有 ip 的平均异常率,并用 averageExceptionRate 表示。平均异常率比较好理解,即异常总数 / 总调用次数,上例中 averageExceptionRate =(1 + 4) / (5 + 10 + 10) = 0.2.

当某个ip的窗口调用次数小于该服务的最小窗口调用次数( leastWindCount )则忽略并将状态设置为 IGNOGRE。否则进行降级和恢复度量。 如 invokeStat 1 的 invokeCount 为5,如果 leastWindCount 设置为6 则 invokeStat 1 会被忽略。

当某个ip的 时间窗口内的异常率和服务平均异常比例 windowExceptionRate 大于 配置的 leastWindowExceptionRateMultiplte (最小时间窗口内异常率和服务平均异常率的降级比值),那么将该IP设置为 ABNORMAL, 否则设置为 HEALTH.

windowExceptionRate 是异常率和服务平均异常比例,invokeStat 1 的异常率为 4/5 = 0.8, 则其对应的 windowExceptionRate = 0.8 / 0.2 = 4. 假设 leastWindowExceptionRateMultiplte =4, 那么 invokeStat 1 是一次服务,则需要进行降级操作。

接下来我们来看具体的源码实现:
ServiceHorizontalMeasureStrategy#measure

public MeasureResult measure(MeasureModel measureModel) { MeasureResult measureResult = new MeasureResult(); measureResult.setMeasureModel(measureModel); String appName = measureModel.getAppName(); List<InvocationStat> stats = measureModel.getInvocationStats(); if (!CommonUtils.isNotEmpty(stats)) { return measureResult; } //1 //这个方法主要是复制出一个当前时间点的调用情况,只统计被复制的InvocationStat //如果有被新剔除的InvocationStat,则不会存在于该次获取结果中。 List<InvocationStat> invocationStats = getInvocationStatSnapshots(stats); //FaultToleranceConfig的timeWindow所设置的,时间窗口,默认是10 long timeWindow = FaultToleranceConfigManager.getTimeWindow(appName); /* leastWindowCount在同一次度量中保持不变*/ //默认InvocationStat如果要参与统计的窗口内最低调用次数,时间窗口内,至少调用的次数.在时间窗口内总共都不足10,认为不需要调控. long leastWindowCount = FaultToleranceConfigManager.getLeastWindowCount(appName); //最小是1,也就是时间窗口内,只要调用了就进行统计 leastWindowCount = leastWindowCount < LEGAL_LEAST_WINDOW_COUNT ? LEGAL_LEAST_WINDOW_COUNT : leastWindowCount; //2. /* 计算平均异常率和度量单个ip的时候都需要使用到appWeight*/ double averageExceptionRate = calculateAverageExceptionRate(invocationStats, leastWindowCount); //表示当前机器是平均异常率的多少倍才降级,默认是6 double leastWindowExceptionRateMultiple = FaultToleranceConfigManager .getLeastWindowExceptionRateMultiple(appName); for (InvocationStat invocationStat : invocationStats) { MeasureResultDetail measureResultDetail = null; InvocationStatDimension statDimension = invocationStat.getDimension(); long windowCount = invocationStat.getInvokeCount(); //3 //这里主要是根据Invocation的实际权重计算该Invocation的实际最小窗口调用次数 long invocationLeastWindowCount = getInvocationLeastWindowCount(invocationStat, ProviderInfoWeightManager.getWeight(statDimension.getProviderInfo()), leastWindowCount); //4 //当总调用的次数为0的时候,averageExceptionRate =-1,这个时候可以设置为忽略 if (averageExceptionRate == -1) { measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.IGNORE); } else { if (invocationLeastWindowCount != -1 && windowCount >= invocationLeastWindowCount) { //获取异常率 double windowExceptionRate = invocationStat.getExceptionRate(); //没有异常的情况,设置状态为健康 if (averageExceptionRate == 0) { measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.HEALTH); } else { //5 //这里主要是看这次被遍历到invocationStat的异常率和平均异常率之比 double windowExceptionRateMultiple = CalculateUtils.divide( windowExceptionRate, averageExceptionRate); //如果当前的invocationStat的异常是平均异常的6倍,那么就设置状态为异常 measureResultDetail = windowExceptionRateMultiple >= leastWindowExceptionRateMultiple ? new MeasureResultDetail(statDimension, MeasureState.ABNORMAL) : new MeasureResultDetail(statDimension, MeasureState.HEALTH); } measureResultDetail.setAbnormalRate(windowExceptionRate); measureResultDetail.setAverageAbnormalRate(averageExceptionRate); measureResultDetail.setLeastAbnormalRateMultiple(leastWindowExceptionRateMultiple); } else { measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.IGNORE); } } measureResultDetail.setWindowCount(windowCount); measureResultDetail.setTimeWindow(timeWindow); measureResultDetail.setLeastWindowCount(invocationLeastWindowCount); measureResult.addMeasureDetail(measureResultDetail); } //打日志 logMeasureResult(measureResult, timeWindow, leastWindowCount, averageExceptionRate, leastWindowExceptionRateMultiple); InvocationStatFactory.updateInvocationStats(invocationStats); return measureResult; }

上面这个方法有点长,我给这个方法标注了数字,跟着数字标记去看。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppwdd.html