7.源码分析---SOFARPC是如何实现故障剔除的? (5)

RouterChain#route

public List<ProviderInfo> route(SofaRequest request, List<ProviderInfo> providerInfos) { for (Router router : routers) { providerInfos = router.route(request, providerInfos); } return providerInfos; } //RegistryRouter#route public List<ProviderInfo> route(SofaRequest request, List<ProviderInfo> providerInfos) { //has address. FIXME if (CommonUtils.isNotEmpty(providerInfos)) { return providerInfos; } AddressHolder addressHolder = consumerBootstrap.getCluster().getAddressHolder(); if (addressHolder != null) { List<ProviderInfo> current = addressHolder.getProviderInfos(RpcConstants.ADDRESS_DEFAULT_GROUP); if (providerInfos != null) { providerInfos.addAll(current); } else { providerInfos = current; } } recordRouterWay(RPC_REGISTRY_ROUTER); return providerInfos; }

我们这里考虑RegistryRouter进行路由选择的情况。
在RegistryRouter#route里面首先获取addressHolder,调用其实现类SingleGroupAddressHolder

SingleGroupAddressHolder#getProviderInfos

/** * 配置的直连地址列表 */ protected ProviderGroup directUrlGroup; /** * 注册中心来的地址列表 */ protected ProviderGroup registryGroup; private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private Lock rLock = lock.readLock(); public List<ProviderInfo> getProviderInfos(String groupName) { rLock.lock(); try { // 复制一份 return new ArrayList<ProviderInfo>(getProviderGroup(groupName).getProviderInfos()); } finally { rLock.unlock(); } } public ProviderGroup getProviderGroup(String groupName) { rLock.lock(); try { return RpcConstants.ADDRESS_DIRECT_GROUP.equals(groupName) ? directUrlGroup : registryGroup; } finally { rLock.unlock(); } }

这里用的是读写锁,也就是说,在读的时候可以并发读,但是不允许读的时候有写的操作。然后根据groupName获取到相应的直连集合。

实现故障剔除,筛选合适的provider do { // 再进行负载均衡筛选,默认使用RandomLoadBalancer providerInfo = loadBalancer.select(message, providerInfos); ClientTransport transport = selectByProvider(message, providerInfo); if (transport != null) { return providerInfo; } providerInfos.remove(providerInfo); } while (!providerInfos.isEmpty());

这里是真正实现了故障剔除的方法,负载均衡我已经在上一篇已经分析过了,这里不再赘述,所以我们直接看到selectByProvider方法中

protected ClientTransport selectByProvider(SofaRequest message, ProviderInfo providerInfo) { ClientTransport transport = connectionHolder.getAvailableClientTransport(providerInfo); if (transport != null) { if (transport.isAvailable()) { lastProviderInfo = providerInfo; checkAlias(providerInfo, message); //检查分组 return transport; } else { connectionHolder.setUnavailable(providerInfo, transport); } } return null; } //AllConnectConnectionHolder#getAvailableClientTransport public ClientTransport getAvailableClientTransport(ProviderInfo providerInfo) { // 先去存活列表 ClientTransport transport = aliveConnections.get(providerInfo); if (transport != null) { return transport; } // 再去亚健康列表 这个列表暂时没有实现的地方 transport = subHealthConnections.get(providerInfo); if (transport != null) { return transport; } // 最后看看是否第一次调用未初始化 transport = uninitializedConnections.get(providerInfo); if (transport != null) { // 未初始化则初始化,这里是lazy为ture的情况,延迟初始化 synchronized (this) { transport = uninitializedConnections.get(providerInfo); if (transport != null) { initClientTransport(consumerConfig.getInterfaceId(), providerInfo, transport); uninitializedConnections.remove(providerInfo); } return getAvailableClientTransport(providerInfo); } } return null; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdxfs.html