3. SOFAJRaft源码分析— 是如何进行选举的? (3)

这个方法由两个作用,如果传入的newLeaderId不是个空的,那么就会设置一个新的leader,并向状态机发送一个START_FOLLOWING事件;如果传入的newLeaderId是空的,那么就会发送一个STOP_FOLLOWING事件,并把当前的leader置空。

启动electionTimer,进行leader选举

electionTimer是RepeatedTimer的实现类,在这里我就不多说了,上一篇文章已经介绍过了。

我这里来看看electionTimer的onTrigger方法是怎么处理选举事件的,electionTimer的onTrigger方法会调用NodeImpl的handleElectionTimeout方法,所以直接看这个方法:

NodeImpl#handleElectionTimeout

private void handleElectionTimeout() { boolean doUnlock = true; this.writeLock.lock(); try { if (this.state != State.STATE_FOLLOWER) { return; } //如果当前选举没有超时则说明此轮选举有效 if (isCurrentLeaderValid()) { return; } resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.", this.leaderId)); doUnlock = false; //预投票 (pre-vote) 环节 //候选者在发起投票之前,先发起预投票, // 如果没有得到半数以上节点的反馈,则候选者就会识趣的放弃参选 preVote(); } finally { if (doUnlock) { this.writeLock.unlock(); } } }

在这个方法里,首先会加上一个写锁,然后进行校验,最后先发起一个预投票。

校验的时候会校验当前的状态是不是follower,校验leader和follower上次的通信时间是不是超过了ElectionTimeoutMs,如果没有超过,说明leader存活,没必要发起选举;如果通信超时,那么会将leader置空,然后调用预选举。

NodeImpl#isCurrentLeaderValid

private boolean isCurrentLeaderValid() { return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs(); }

用当前时间和上次leader通信时间相减,如果小于ElectionTimeoutMs(默认1s),那么就没有超时,说明leader有效

预选票preVote

我们在handleElectionTimeout方法中最后调用了preVote方法,接下来重点看一下这个方法。

下面我将preVote拆分成几部分来进行讲解:
NodeImpl#preVote part1

private void preVote() { long oldTerm; try { LOG.info("Node {} term {} start preVote.", getNodeId(), this.currTerm); if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) { LOG.warn( "Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.", getNodeId()); return; } //conf里面记录了集群节点的信息,如果当前的节点不包含在集群里说明是由问题的 if (!this.conf.contains(this.serverId)) { LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf); return; } //设置一下当前的任期 oldTerm = this.currTerm; } finally { this.writeLock.unlock(); } .... }

这部分代码是一开始进到preVote这个方法首先要经过一些校验,例如当前的节点不能再安装快照的时候进行选举;查看一下当前的节点是不是在自己设置的conf里面,conf这个属性会包含了集群的所有节点;最后设置一下当前的任期后解锁。

NodeImpl#preVote part2

private void preVote() { .... //返回最新的log实体类 final LogId lastLogId = this.logManager.getLastLogId(true); boolean doUnlock = true; this.writeLock.lock(); try { // pre_vote need defense ABA after unlock&writeLock //因为在上面没有重新加锁的间隙里可能会被别的线程改变了,所以这里校验一下 if (oldTerm != this.currTerm) { LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm); return; } //初始化预投票投票箱 this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); for (final PeerId peer : this.conf.listPeers()) { //如果遍历的节点是当前节点就跳过 if (peer.equals(this.serverId)) { continue; } //失联的节点也跳过 if (!this.rpcService.connect(peer.getEndpoint())) { LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint()); continue; } //设置一个回调的类 final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm); //向被遍历到的这个节点发送一个预投票的请求 done.request = RequestVoteRequest.newBuilder() // .setPreVote(true) // it's a pre-vote request. .setGroupId(this.groupId) // .setServerId(this.serverId.toString()) // .setPeerId(peer.toString()) // .setTerm(this.currTerm + 1) // next term,注意这里发送过去的任期会加一 .setLastLogIndex(lastLogId.getIndex()) // .setLastLogTerm(lastLogId.getTerm()) // .build(); this.rpcService.preVote(peer.getEndpoint(), done.request, done); } //自己也可以投给自己 this.prevVoteCtx.grant(this.serverId); if (this.prevVoteCtx.isGranted()) { doUnlock = false; electSelf(); } } finally { if (doUnlock) { this.writeLock.unlock(); } } }

这部分代码:

首先会获取最新的log信息,由LogId封装,里面包含两部分,一部分是这个日志的index和写入这个日志所对应当时节点的一个term任期

初始化预投票投票箱

遍历所有的集群节点

如果遍历的节点是当前节点就跳过,如果遍历的节点因为宕机或者手动下线等原因连接不上也跳过

向遍历的节点发送一个RequestVoteRequest请求预投票给自己

最后因为自己也是集群节点的一员,所以自己也投票给自己

初始化预投票投票箱

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypgxg.html