3. SOFAJRaft源码分析— 是如何进行选举的? (2)

调用stopVoteTimer和stopStepDownTimer方法里面主要是调用相应的RepeatedTimer的stop方法,在stop方法里面会将stopped状态设置为ture,并将timeout设置为取消,并将这个timeout加入到cancelledTimeouts集合中去:
如果看了2. SOFAJRaft源码分析—JRaft的定时任务调度器是怎么做的?这篇文章的话,那么下面这段代码应该一看就明白是怎么回事了的。

public void stop() { this.lock.lock(); try { if (this.stopped) { return; } this.stopped = true; if (this.timeout != null) { this.timeout.cancel(); this.running = false; this.timeout = null; } } finally { this.lock.unlock(); } } 状态机处理LEADER_STOP事件

在调用NodeImpl的onLeaderStop方法中,实际上是调用了FSMCallerImpl的onLeaderStop方法
NodeImpl#onLeaderStop

private void onLeaderStop(final Status status) { this.replicatorGroup.clearFailureReplicators(); this.fsmCaller.onLeaderStop(status); }

FSMCallerImpl#onLeaderStop

public boolean onLeaderStop(final Status status) { return enqueueTask((task, sequence) -> { //设置当前task的状态为LEADER_STOP task.type = TaskType.LEADER_STOP; task.status = new Status(status); }); } private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) { if (this.shutdownLatch != null) { // Shutting down LOG.warn("FSMCaller is stopped, can not apply new task."); return false; } //使用Disruptor发布事件 this.taskQueue.publishEvent(tpl); return true; }

这个方法里像taskQueue队列里面发布了一个LEADER_STOP事件,taskQueue是在FSMCallerImpl的init方法中被初始化的:

public boolean init(final FSMCallerOptions opts) { ..... this.disruptor = DisruptorBuilder.<ApplyTask>newInstance() // .setEventFactory(new ApplyTaskFactory()) // .setRingBufferSize(opts.getDisruptorBufferSize()) // .setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-Disruptor-", true)) // .setProducerType(ProducerType.MULTI) // .setWaitStrategy(new BlockingWaitStrategy()) // .build(); this.disruptor.handleEventsWith(new ApplyTaskHandler()); this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName())); this.taskQueue = this.disruptor.start(); ..... }

在taskQueue中发布了一个任务之后会交给ApplyTaskHandler进行处理

ApplyTaskHandler

private class ApplyTaskHandler implements EventHandler<ApplyTask> { // max committed index in current batch, reset to -1 every batch private long maxCommittedIndex = -1; @Override public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception { this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch); } }

每当有任务到达taskQueue队列的时候会调用ApplyTaskHandler的onEvent方法来处理事件,具体的执行逻辑由runApplyTask方法进行处理

FSMCallerImpl#runApplyTask

private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) { CountDownLatch shutdown = null; ... switch (task.type) { ... case LEADER_STOP: this.currTask = TaskType.LEADER_STOP; doLeaderStop(task.status); break; ... } .... }

在runApplyTask方法里会对很多的事件进行处理,我们这里只看LEADER_STOP是怎么做的:

在switch里会调用doLeaderStop方法,这个方法会调用到FSMCallerImpl里面封装的StateMachine状态机的onLeaderStart方法:

private void doLeaderStop(final Status status) { this.fsm.onLeaderStop(status); }

这样就可以对leader停止时进行客制化的处理了。

重置leader

接下来会调用resetLeaderId(PeerId.emptyPeer(), status);方法来重置leader

private void resetLeaderId(final PeerId newLeaderId, final Status status) { if (newLeaderId.isEmpty()) { //这个判断表示如果当前节点是候选者或者是Follower,并且已经有leader了 if (!this.leaderId.isEmpty() && this.state.compareTo(State.STATE_TRANSFERRING) > 0) { //向状态机装发布停止跟随该leader的事件 this.fsmCaller.onStopFollowing(new LeaderChangeContext(this.leaderId.copy(), this.currTerm, status)); } //把当前的leader设置为一个空值 this.leaderId = PeerId.emptyPeer(); } else { //如果当前节点没有leader if (this.leaderId == null || this.leaderId.isEmpty()) { //那么发布要跟随该leader的事件 this.fsmCaller.onStartFollowing(new LeaderChangeContext(newLeaderId, this.currTerm, status)); } this.leaderId = newLeaderId.copy(); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypgxg.html