7.源码分析---SOFARPC是如何实现故障剔除的? (4)

DefaultConsumerBootstrap#updateAllProviders

public void updateAllProviders(List<ProviderGroup> providerGroups) { List<ProviderGroup> oldProviderGroups = new ArrayList<ProviderGroup>(addressHolder.getProviderGroups()); int count = 0; if (providerGroups != null) { for (ProviderGroup providerGroup : providerGroups) { //校验检查providerGroup里面的元素是不是为空 //消费者的配置的protocol是不是和provider 的protocol相同 checkProviderInfo(providerGroup); count += providerGroup.size(); } } //走到这里说明没有provider if (count == 0) { Collection<ProviderInfo> currentProviderList = currentProviderList(); addressHolder.updateAllProviders(providerGroups); if (CommonUtils.isNotEmpty(currentProviderList)) { if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) { LOGGER.warnWithApp(consumerConfig.getAppName(), "Provider list is emptied, may be all " + "providers has been closed, or this consumer has been add to blacklist"); closeTransports(); } } } else { addressHolder.updateAllProviders(providerGroups); connectionHolder.updateAllProviders(providerGroups); } if (EventBus.isEnable(ProviderInfoUpdateAllEvent.class)) { ProviderInfoUpdateAllEvent event = new ProviderInfoUpdateAllEvent(consumerConfig, oldProviderGroups, providerGroups); EventBus.post(event); } }

其实这个方法里面就做了一件事情,那就是把provider放入到addressHolder和connectionHolder中。

故障剔除

客户端在引用的时候会调用FailoverCluster#doInvoke方法,然后调用父类的select进行路由和负载均衡选用合适的provider。

AbstractCluster#doInvoke public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException { String methodName = request.getMethodName(); int retries = consumerConfig.getMethodRetries(methodName); int time = 0; SofaRpcException throwable = null;// 异常日志 List<ProviderInfo> invokedProviderInfos = new ArrayList<ProviderInfo>(retries + 1); do { //负载均衡 ProviderInfo providerInfo = select(request, invokedProviderInfos); try { //调用过滤器链 SofaResponse response = filterChain(providerInfo, request); if (response != null) { if (throwable != null) { if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) { LOGGER.warnWithApp(consumerConfig.getAppName(), LogCodes.getLog(LogCodes.WARN_SUCCESS_BY_RETRY, throwable.getClass() + ":" + throwable.getMessage(), invokedProviderInfos)); } } return response; } else { throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Failed to call " + request.getInterfaceName() + "." + methodName + " on remote server " + providerInfo + ", return null"); time++; } } catch (SofaRpcException e) { // 服务端异常+ 超时异常 才发起rpc异常重试 if (e.getErrorType() == RpcErrorType.SERVER_BUSY || e.getErrorType() == RpcErrorType.CLIENT_TIMEOUT) { throwable = e; time++; } else { throw e; } } catch (Exception e) { // 其它异常不重试 throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Failed to call " + request.getInterfaceName() + "." + request.getMethodName() + " on remote server: " + providerInfo + ", cause by unknown exception: " + e.getClass().getName() + ", message is: " + e.getMessage(), e); } finally { if (RpcInternalContext.isAttachmentEnable()) { RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_INVOKE_TIMES, time + 1); // 重试次数 } } invokedProviderInfos.add(providerInfo); } while (time <= retries); throw throwable; }

这个方法需要注意的是,一开始invokedProviderInfos集合是空的,如果调用完后没有返回response,而是抛出异常了,那么就会把这个抛出异常的provider实例加入到invokedProviderInfos集合。这个集合会在select方法里面用到。

AbstractCluster#select

客户端在引用服务端的时候会通过路由找到所有的provider,然后进行剔除。路由是在调用AbstractCluster#select的时候做的。所以我们先看看这个方法。

protected ProviderInfo select(SofaRequest message, List<ProviderInfo> invokedProviderInfos) throws SofaRpcException { // 粘滞连接,当前连接可用 if (consumerConfig.isSticky()) { //这个变量会在下面的selectByProvider方法为其赋值 if (lastProviderInfo != null) { ProviderInfo providerInfo = lastProviderInfo; ClientTransport lastTransport = connectionHolder.getAvailableClientTransport(providerInfo); if (lastTransport != null && lastTransport.isAvailable()) { checkAlias(providerInfo, message); return providerInfo; } } } // 原始服务列表数据 --> 路由结果 List<ProviderInfo> providerInfos = routerChain.route(message, null); //保存一下原始地址,为了打印 List<ProviderInfo> orginalProviderInfos = new ArrayList<ProviderInfo>(providerInfos); if (CommonUtils.isEmpty(providerInfos)) { throw noAvailableProviderException(message.getTargetServiceUniqueName()); } //invokedProviderInfos保存的是重试的provider,说明该provider已经调用过,并且失败了 //所以在这里排除 if (CommonUtils.isNotEmpty(invokedProviderInfos) && providerInfos.size() > invokedProviderInfos.size()) { // 总数大于已调用数 providerInfos.removeAll(invokedProviderInfos);// 已经调用异常的本次不再重试 } String targetIP = null; ProviderInfo providerInfo; RpcInternalContext context = RpcInternalContext.peekContext(); if (context != null) { targetIP = (String) RpcInternalContext.getContext().getAttachment(RpcConstants.HIDDEN_KEY_PINPOINT); } if (StringUtils.isNotBlank(targetIP)) { // 如果指定了调用地址 providerInfo = selectPinpointProvider(targetIP, providerInfos); if (providerInfo == null) { // 指定的不存在 throw unavailableProviderException(message.getTargetServiceUniqueName(), targetIP); } ClientTransport clientTransport = selectByProvider(message, providerInfo); if (clientTransport == null) { // 指定的不存在或已死,抛出异常 throw unavailableProviderException(message.getTargetServiceUniqueName(), targetIP); } return providerInfo; } else { do { // 再进行负载均衡筛选,默认使用RandomLoadBalancer providerInfo = loadBalancer.select(message, providerInfos); ClientTransport transport = selectByProvider(message, providerInfo); if (transport != null) { return providerInfo; } providerInfos.remove(providerInfo); } while (!providerInfos.isEmpty()); } throw unavailableProviderException(message.getTargetServiceUniqueName(), convertProviders2Urls(orginalProviderInfos)); }

这个方法主要做了如下几件事:

如果设置了粘滞连接,那么会继续调用上一次使用过的provider

调用router获取原始服务列表数据

如果invokedProviderInfos不为空的话,原始服务列表里面需要剔除掉这些provider

如果设置了直连,那么调用selectPinpointProvider获取选定的provider,不存在故障剔除

没有设置直连,则循环调用筛选

路由筛选porvider

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdxfs.html