7.源码分析---SOFARPC是如何实现故障剔除的? (2)

AllConnectConnectionHolder#doReconnect

/** * 存活的客户端列表(保持了长连接,且一切正常的) */ protected ConcurrentMap<ProviderInfo, ClientTransport> aliveConnections = new ConcurrentHashMap<ProviderInfo, ClientTransport>(); /** * 失败待重试的客户端列表(连上后断开的) */ protected ConcurrentMap<ProviderInfo, ClientTransport> retryConnections = new ConcurrentHashMap<ProviderInfo, ClientTransport>(); private void doReconnect() { //获取配置的接口 String interfaceId = consumerConfig.getInterfaceId(); //获取应用名 String appName = consumerConfig.getAppName(); int thisTime = reconnectFlag.incrementAndGet(); boolean print = thisTime % 6 == 0; //是否打印error,每6次打印一次 // 可用的连接集合是否为空 boolean isAliveEmptyFirst = isAvailableEmpty(); // 检查可用连接 todo subHealth for (Map.Entry<ProviderInfo, ClientTransport> alive : aliveConnections.entrySet()) { ClientTransport connection = alive.getValue(); //如果该连接不可用,那么就将该连接从可用连接集合里剔除放入到重试集合里面 if (connection != null && !connection.isAvailable()) { aliveToRetry(alive.getKey(), connection); } } //遍历所有待重试集合 for (Map.Entry<ProviderInfo, ClientTransport> entry : getRetryConnections() .entrySet()) { ProviderInfo providerInfo = entry.getKey(); int providerPeriodCoefficient = CommonUtils.parseNum((Integer) providerInfo.getDynamicAttr(ProviderInfoAttrs.ATTR_RC_PERIOD_COEFFICIENT), 1); if (thisTime % providerPeriodCoefficient != 0) { continue; // 如果命中重连周期,则进行重连 } ClientTransport transport = entry.getValue(); if (LOGGER.isDebugEnabled(appName)) { LOGGER.debugWithApp(appName, "Retry connect to {} provider:{} ...", interfaceId, providerInfo); } try { //重连 transport.connect(); //重连完检查一下该连接是否可用 if (doubleCheck(interfaceId, providerInfo, transport)) { providerInfo.setDynamicAttr(ProviderInfoAttrs.ATTR_RC_PERIOD_COEFFICIENT, 1); //如果该连接可用,则把该连接从重试集合里移除,加入到可用集合里 retryToAlive(providerInfo, transport); } } catch (Exception e) { if (print) { if (LOGGER.isWarnEnabled(appName)) { LOGGER.warnWithApp(appName, "Retry connect to {} provider:{} error ! The exception is " + e .getMessage(), interfaceId, providerInfo); } } else { if (LOGGER.isDebugEnabled(appName)) { LOGGER.debugWithApp(appName, "Retry connect to {} provider:{} error ! The exception is " + e .getMessage(), interfaceId, providerInfo); } } } } if (isAliveEmptyFirst && !isAvailableEmpty()) { // 原来空,变成不空 notifyStateChangeToAvailable(); } }

这个doReconnect方法里面主要做了以下几件事:

检查可用连接集合,如果该连接不可用,那么就将该连接从可用连接集合里剔除放入到重试集合里面。

遍历所有待重试集合,如果该thisTime和providerPeriodCoefficient取模为零,那么就进行重连。

设置监听器。

这里有个细节,在aliveToRetry方法里面是加锁的,尽管aliveConnections和retryConnections都是安全的集合,但是这里有一个if判断,这两步操作并不是线程安全的。

protected void aliveToRetry(ProviderInfo providerInfo, ClientTransport transport) { providerLock.lock(); try { //这里两步操作并不是原子性的,所以需要加锁 if (aliveConnections.remove(providerInfo) != null) { retryConnections.put(providerInfo, transport); } } finally { providerLock.unlock(); } }

由于我们这里并不分析网络是怎么传输和连接的,所以暂时不分析transport#connect,大家只要知道这里是保持一个长连接的就可以了。

接下来我们再看一下doubleCheck方法:

protected boolean doubleCheck(String interfaceId, ProviderInfo providerInfo, ClientTransport transport) { if (transport.isAvailable()) { try { // 睡一下下 防止被连上又被服务端踢下线 Thread.sleep(100); } catch (InterruptedException e) { // ignore } if (transport.isAvailable()) { // double check return true; } else { // 可能在黑名单里,刚连上就断开了 if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) { LOGGER.warnWithApp(consumerConfig.getAppName(), "Connection has been closed after connected (in last 100ms)!" + " Maybe connectionNum of provider has been reached limit," + " or your host is in the blacklist of provider {}/{}", interfaceId, transport.getConfig().getProviderInfo()); } providerInfo.setDynamicAttr(ProviderInfoAttrs.ATTR_RC_PERIOD_COEFFICIENT, 5); return false; } } else { return false; } }

这里面主要是检查一下连接的稳定性,如果一开始连接成功,在100ms内又断开连接,那么就打出警告日志,当看到这个日志在后台的时候需要我们查看一下网络连接的情况。

然后再把reconnectCoefficient属性设置为5,当thisTime与providerPeriodCoefficient取模为0的时候再次尝试连接,其中如果按默认设置的话,需要50秒才会进行重连。

2. 获取服务列表

调用consumerBootstrap#subscribe方法进行获取服务列表,会进入到抽象类DefaultConsumerBootstrap的subscribe方法中。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdxfs.html