ServiceHorizontalRegulationStrategy#isReachMaxDegradeIpCount
public boolean isReachMaxDegradeIpCount(MeasureResultDetail measureResultDetail) {
InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension();
ConcurrentHashSet<String> ips = getDegradeProviders(statDimension.getDimensionKey());
String ip = statDimension.getIp();
if (ips.contains(ip)) {
return false;
} else {
//默认一个服务能够调控的最大ip数
int degradeMaxIpCount = FaultToleranceConfigManager.getDegradeMaxIpCount(statDimension.getAppName());
ipsLock.lock();
try {
if (ips.size() < degradeMaxIpCount) {
ips.add(ip);
return false;
} else {
return true;
}
} finally {
ipsLock.unlock();
}
}
}
这个方法是为了能够控制最多一个服务下面能调控多少个节点。比如一个服务下面只有3个节点,其中2个节点出了问题,通过调控解决了,那么不可能将第三个节点也进行调控了吧,必须要进行人工干预了,为啥会出现这样的问题。
然后会调用WeightDegradeStrategy#degrade对节点进行降权
WeightDegradeStrategy#degrade
public void degrade(MeasureResultDetail measureResultDetail) {
//调用LogPrintDegradeStrategy方法,打印日志用
super.degrade(measureResultDetail);
if (measureResultDetail.isLogOnly()) {
return;
}
InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension();
String appName = statDimension.getAppName();
ProviderInfo providerInfo = statDimension.getProviderInfo();
// if provider is removed or provider is warming up
//如果为空,或是在预热中,则直接返回
if (providerInfo == null || providerInfo.getStatus() == ProviderStatus.WARMING_UP) {
return;
}
//目前provider权重
int currentWeight = ProviderInfoWeightManager.getWeight(providerInfo);
//降权比重
double weightDegradeRate = FaultToleranceConfigManager.getWeightDegradeRate(appName);
//最少权重,默认为1
int degradeLeastWeight = FaultToleranceConfigManager.getDegradeLeastWeight(appName);
//权重比率 * 目前权重
int degradeWeight = CalculateUtils.multiply(currentWeight, weightDegradeRate);
//不能小于最小值
degradeWeight = degradeWeight < degradeLeastWeight ? degradeLeastWeight : degradeWeight;
// degrade weight of this provider info
boolean success = ProviderInfoWeightManager.degradeWeight(providerInfo, degradeWeight);
if (success && LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "the weight was degraded. serviceUniqueName:["
+ statDimension.getService() + "],ip:["
+ statDimension.getIp() + "],origin weight:["
+ currentWeight + "],degraded weight:["
+ degradeWeight + "].");
}
}
//ProviderInfoWeightManager
public static boolean degradeWeight(ProviderInfo providerInfo, int weight) {
providerInfo.setStatus(ProviderStatus.DEGRADED);
providerInfo.setWeight(weight);
return true;
}
这个方法实际上就是权重拿出来,然后根据比率进行设值并且不能小于最小的比重。
最后调用ProviderInfoWeightManager把当前的节点设值为DEGRADED,并设值新的权重。
如果是健康节点
调用ServiceHorizontalRegulationStrategy#isExistInTheDegradeList判断一下当前节点有没有被降级
ServiceHorizontalRegulationStrategy#isExistInTheDegradeList
public boolean isExistInTheDegradeList(MeasureResultDetail measureResultDetail) {
InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension();
ConcurrentHashSet<String> ips = getDegradeProviders(statDimension.getDimensionKey());
return ips != null && ips.contains(statDimension.getIp());
}
在调用isReachMaxDegradeIpCount方法的时候会把被降级的ip放入到ips集合中,所以这里只要获取就可以了。
如果该节点已被降级那么调用WeightRecoverStrategy#recover进行恢复
WeightRecoverStrategy#recover
public void recover(MeasureResultDetail measureResultDetail) {
InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension();
ProviderInfo providerInfo = statDimension.getProviderInfo();
// if provider is removed or provider is warming up
if (providerInfo == null || providerInfo.getStatus() == ProviderStatus.WARMING_UP) {
return;
}
Integer currentWeight = ProviderInfoWeightManager.getWeight(providerInfo);
if (currentWeight == -1) {
return;
}
String appName = statDimension.getAppName();
//默认2
double weightRecoverRate = FaultToleranceConfigManager.getWeightRecoverRate(appName);
//也就是说一次只能恢复到2倍,不会一次性就恢复到originWeight
int recoverWeight = CalculateUtils.multiply(currentWeight, weightRecoverRate);
int originWeight = statDimension.getOriginWeight();
// recover weight of this provider info
if (recoverWeight >= originWeight) {
measureResultDetail.setRecoveredOriginWeight(true);
//将provider状态设置为AVAILABLE,并且设置Weight
ProviderInfoWeightManager.recoverOriginWeight(providerInfo, originWeight);
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "the weight was recovered to origin value. serviceUniqueName:["
+ statDimension.getService() + "],ip:["
+ statDimension.getIp() + "],origin weight:["
+ currentWeight + "],recover weight:["
+ originWeight + "].");
}
} else {
measureResultDetail.setRecoveredOriginWeight(false);
boolean success = ProviderInfoWeightManager.recoverWeight(providerInfo, recoverWeight);
if (success && LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "the weight was recovered. serviceUniqueName:["
+ statDimension.getService() + "],ip:["
+ statDimension.getIp() + "],origin weight:["
+ currentWeight + "],recover weight:["
+ recoverWeight + "].");
}
}
}
这个方法很简单,各位可以看看我上面的注释。
总结
总的来说FaultToleranceModule分为两部分:
FaultToleranceSubscriber订阅事件,负责订阅同步和异步结果事件
根据调用事件进行统计,以及内置的一些策略完成服务的降级和恢复操作。