7.源码分析---SOFARPC是如何实现故障剔除的? (3)

DefaultConsumerBootstrap#subscribe

public List<ProviderGroup> subscribe() { List<ProviderGroup> result = null; String directUrl = consumerConfig.getDirectUrl(); if (StringUtils.isNotEmpty(directUrl)) { // 如果走直连 result = subscribeFromDirectUrl(directUrl); } else { // 没有配置url直连 List<RegistryConfig> registryConfigs = consumerConfig.getRegistry(); if (CommonUtils.isNotEmpty(registryConfigs)) { // 从多个注册中心订阅服务列表 result = subscribeFromRegistries(); } } return result; }

这里分成两步:

如果在客户端设置了直连地址的话则调用subscribeFromDirectUrl方法。

如果没有配置直接地址则获取注册中心后调用subscribeFromRegistries方法。

DefaultConsumerBootstrap#subscribeFromDirectUrl

这个方法里面主要是将直连地址用“;”拆分,然后封装成provider放入直连分组集合中。

protected List<ProviderGroup> subscribeFromDirectUrl(String directUrl) { List<ProviderGroup> result = new ArrayList<ProviderGroup>(); List<ProviderInfo> tmpProviderInfoList = new ArrayList<ProviderInfo>(); //拆分url,多个url可以用“;”分割 String[] providerStrs = StringUtils.splitWithCommaOrSemicolon(directUrl); for (String providerStr : providerStrs) { ProviderInfo providerInfo = convertToProviderInfo(providerStr); if (providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_SOURCE) == null) { providerInfo.setStaticAttr(ProviderInfoAttrs.ATTR_SOURCE, "direct"); } tmpProviderInfoList.add(providerInfo); } //加入直连分组 result.add(new ProviderGroup(RpcConstants.ADDRESS_DIRECT_GROUP, tmpProviderInfoList)); return result; }

DefaultConsumerBootstrap#subscribeFromRegistries

protected List<ProviderGroup> subscribeFromRegistries() { List<ProviderGroup> result = new ArrayList<ProviderGroup>(); List<RegistryConfig> registryConfigs = consumerConfig.getRegistry(); //没有配置注册中心,直接返回 if (CommonUtils.isEmpty(registryConfigs)) { return result; } // 是否等待结果 int addressWaitTime = consumerConfig.getAddressWait(); int maxAddressWaitTime = SofaConfigs.getIntegerValue(consumerConfig.getAppName(), SofaOptions.CONFIG_MAX_ADDRESS_WAIT_TIME, SofaOptions.MAX_ADDRESS_WAIT_TIME); addressWaitTime = addressWaitTime < 0 ? maxAddressWaitTime : Math.min(addressWaitTime, maxAddressWaitTime); ProviderInfoListener listener = consumerConfig.getProviderInfoListener(); //设置CountDownLatch用来等待 respondRegistries = addressWaitTime == 0 ? null : new CountDownLatch(registryConfigs.size()); // 从注册中心订阅 {groupName: ProviderGroup} Map<String, ProviderGroup> tmpProviderInfoList = new HashMap<String, ProviderGroup>(); for (RegistryConfig registryConfig : registryConfigs) { Registry registry = RegistryFactory.getRegistry(registryConfig); registry.init(); registry.start(); try { List<ProviderGroup> current; try { if (respondRegistries != null) { consumerConfig.setProviderInfoListener(new WrapperClusterProviderInfoListener(listener, respondRegistries)); } current = registry.subscribe(consumerConfig); } finally { if (respondRegistries != null) { consumerConfig.setProviderInfoListener(listener); } } if (current == null) { continue; // 未同步返回结果 } else { if (respondRegistries != null) { respondRegistries.countDown(); } } for (ProviderGroup group : current) { // 当前注册中心的 String groupName = group.getName(); if (!group.isEmpty()) { ProviderGroup oldGroup = tmpProviderInfoList.get(groupName); if (oldGroup != null) { oldGroup.addAll(group.getProviderInfos()); } else { tmpProviderInfoList.put(groupName, group); } } } } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { String appName = consumerConfig.getAppName(); if (LOGGER.isWarnEnabled(appName)) { LOGGER.warnWithApp(appName, "Catch exception when subscribe from registry: " + registryConfig.getId() + ", but you can ignore if it's called by JVM shutdown hook", e); } } } if (respondRegistries != null) { try { respondRegistries.await(addressWaitTime, TimeUnit.MILLISECONDS); } catch (Exception ignore) { // NOPMD } } return new ArrayList<ProviderGroup>(tmpProviderInfoList.values()); }

这个这么长的方法实际上做了那么几件事:

遍历注册中心

初始化注册中心,然后订阅注册中心,以异步的方式拉去provider

如果设置了等待,那么就等待一段时间后返回

3. 初始化服务端连接

如果在调用consumerBootstrap#subscribe()后不是异步获取,返回的就不是null,那么就会进入到updateAllProviders,所以我们来看一下这个方法里面做了什么。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdxfs.html