7.源码分析---SOFARPC是如何实现故障剔除的?

我在服务端引用那篇文章里面分析到,服务端在引用的时候会去获取服务端可用的服务,并进行心跳,维护一个可用的集合。

所以我们从客户端初始化这部分说起。

服务连接的维护

客户端初始化的时候会调用cluster#init方法,这里的cluster是继承了AbstractCLuster抽象类,调用的是抽象类里面的init方法。

public synchronized void init() { if (initialized) { // 已初始化 return; } // 构造Router链 routerChain = RouterChain.buildConsumerChain(consumerBootstrap); // 负载均衡策略 考虑是否可动态替换? loadBalancer = LoadBalancerFactory.getLoadBalancer(consumerBootstrap); // 地址管理器 addressHolder = AddressHolderFactory.getAddressHolder(consumerBootstrap); // 连接管理器 connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap); // 构造Filter链,最底层是调用过滤器 this.filterChain = FilterChain.buildConsumerChain(this.consumerConfig, new ConsumerInvoker(consumerBootstrap)); if (consumerConfig.isLazy()) { // 延迟连接 if (LOGGER.isInfoEnabled(consumerConfig.getAppName())) { LOGGER.infoWithApp(consumerConfig.getAppName(), "Connection will be initialized when first invoke."); } } // 启动重连线程 connectionHolder.init(); try { // 得到服务端列表 List<ProviderGroup> all = consumerBootstrap.subscribe(); if (CommonUtils.isNotEmpty(all)) { // 初始化服务端连接(建立长连接) updateAllProviders(all); } } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { throw new SofaRpcRuntimeException("Init provider's transport error!", e); } // 启动成功 initialized = true; // 如果check=true表示强依赖 if (consumerConfig.isCheck() && !isAvailable()) { throw new SofaRpcRuntimeException("The consumer is depend on alive provider " + "and there is no alive provider, you can ignore it " + "by ConsumerConfig.setCheck(boolean) (default is false)"); } }

这上面在服务连接的维护上面主要分为三步:

设置心跳线程,每10秒进行一次心跳

获取服务端列表

初始化服务端连接

1.SOFARPC的心跳线程

AllConnectConnectionHolder#init

这里connectionHolder是AllConnectConnectionHolder的实现类,我们进入到这个类里面看。这里面实际上实现了SOFARPC的心跳检测。

/** * 重连线程 */ private volatile ScheduledService reconThread; public void init() { //如果reconThread没有初始化过,调用startReconnectThread进行初始化 if (reconThread == null) { startReconnectThread(); } } protected void startReconnectThread() { final String interfaceId = consumerConfig.getInterfaceId(); // 启动线程池 // 默认每隔10秒重连 int reconnect = consumerConfig.getReconnectPeriod(); if (reconnect > 0) { reconnect = Math.max(reconnect, 2000); // 最小2000 reconThread = new ScheduledService("CLI-RC-" + interfaceId, ScheduledService.MODE_FIXEDDELAY, new Runnable() { @Override public void run() { try { doReconnect(); } catch (Throwable e) { LOGGER.errorWithApp(consumerConfig.getAppName(), "Exception when retry connect to provider", e); } } }, reconnect, reconnect, TimeUnit.MILLISECONDS).start(); } }

在startReconnectThread方法中,客户端会调用reconnectPeriod变量,如果没有设置则为10秒,如果设置小于10秒则取2秒。也就是说客户端开启的心跳是默认10秒一次,最快也是只能2秒一次。
然后创建了一个ScheduledService实例,并调用其start方法。

我们看一下ScheduledService类是怎么样的结构

ScheduledService

public class ScheduledService { private volatile ScheduledExecutorService scheduledExecutorService; public ScheduledService(String threadName, int mode, Runnable runnable, long initialDelay, long period, TimeUnit unit) { this.threadName = threadName; this.runnable = runnable; this.initialDelay = initialDelay; this.period = period; this.unit = unit; this.mode = mode; } //开始执行定时任务 public synchronized ScheduledService start() { if (started) { return this; } if (scheduledExecutorService == null) { scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(threadName, true)); } ScheduledFuture future = null; //传进来的是MODE_FIXEDDELAY switch (mode) { case MODE_FIXEDRATE: future = scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, unit); break; case MODE_FIXEDDELAY: //创建一个固定延迟的定时任务 future = scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period, unit); break; default: break; } if (future != null) { this.future = future; // 缓存一下 SCHEDULED_SERVICE_MAP.put(this, System.currentTimeMillis()); started = true; } else { started = false; } return this; } }

ScheduledService的作用就是创建一个固定延迟的线程,以固定的时间定时执行一下任务。

然后会默认每10秒钟执行一次AllConnectConnectionHolder的doReconnect方法。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdxfs.html