这个方法由两个作用,如果传入的newLeaderId不是个空的,那么就会设置一个新的leader,并向状态机发送一个START_FOLLOWING事件;如果传入的newLeaderId是空的,那么就会发送一个STOP_FOLLOWING事件,并把当前的leader置空。
启动electionTimer,进行leader选举electionTimer是RepeatedTimer的实现类,在这里我就不多说了,上一篇文章已经介绍过了。
我这里来看看electionTimer的onTrigger方法是怎么处理选举事件的,electionTimer的onTrigger方法会调用NodeImpl的handleElectionTimeout方法,所以直接看这个方法:
NodeImpl#handleElectionTimeout
private void handleElectionTimeout() { boolean doUnlock = true; this.writeLock.lock(); try { if (this.state != State.STATE_FOLLOWER) { return; } //如果当前选举没有超时则说明此轮选举有效 if (isCurrentLeaderValid()) { return; } resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.", this.leaderId)); doUnlock = false; //预投票 (pre-vote) 环节 //候选者在发起投票之前,先发起预投票, // 如果没有得到半数以上节点的反馈,则候选者就会识趣的放弃参选 preVote(); } finally { if (doUnlock) { this.writeLock.unlock(); } } }在这个方法里,首先会加上一个写锁,然后进行校验,最后先发起一个预投票。
校验的时候会校验当前的状态是不是follower,校验leader和follower上次的通信时间是不是超过了ElectionTimeoutMs,如果没有超过,说明leader存活,没必要发起选举;如果通信超时,那么会将leader置空,然后调用预选举。
NodeImpl#isCurrentLeaderValid
private boolean isCurrentLeaderValid() { return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs(); }用当前时间和上次leader通信时间相减,如果小于ElectionTimeoutMs(默认1s),那么就没有超时,说明leader有效
预选票preVote我们在handleElectionTimeout方法中最后调用了preVote方法,接下来重点看一下这个方法。
下面我将preVote拆分成几部分来进行讲解:
NodeImpl#preVote part1
这部分代码是一开始进到preVote这个方法首先要经过一些校验,例如当前的节点不能再安装快照的时候进行选举;查看一下当前的节点是不是在自己设置的conf里面,conf这个属性会包含了集群的所有节点;最后设置一下当前的任期后解锁。
NodeImpl#preVote part2
private void preVote() { .... //返回最新的log实体类 final LogId lastLogId = this.logManager.getLastLogId(true); boolean doUnlock = true; this.writeLock.lock(); try { // pre_vote need defense ABA after unlock&writeLock //因为在上面没有重新加锁的间隙里可能会被别的线程改变了,所以这里校验一下 if (oldTerm != this.currTerm) { LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm); return; } //初始化预投票投票箱 this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); for (final PeerId peer : this.conf.listPeers()) { //如果遍历的节点是当前节点就跳过 if (peer.equals(this.serverId)) { continue; } //失联的节点也跳过 if (!this.rpcService.connect(peer.getEndpoint())) { LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint()); continue; } //设置一个回调的类 final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm); //向被遍历到的这个节点发送一个预投票的请求 done.request = RequestVoteRequest.newBuilder() // .setPreVote(true) // it's a pre-vote request. .setGroupId(this.groupId) // .setServerId(this.serverId.toString()) // .setPeerId(peer.toString()) // .setTerm(this.currTerm + 1) // next term,注意这里发送过去的任期会加一 .setLastLogIndex(lastLogId.getIndex()) // .setLastLogTerm(lastLogId.getTerm()) // .build(); this.rpcService.preVote(peer.getEndpoint(), done.request, done); } //自己也可以投给自己 this.prevVoteCtx.grant(this.serverId); if (this.prevVoteCtx.isGranted()) { doUnlock = false; electSelf(); } } finally { if (doUnlock) { this.writeLock.unlock(); } } }这部分代码:
首先会获取最新的log信息,由LogId封装,里面包含两部分,一部分是这个日志的index和写入这个日志所对应当时节点的一个term任期
初始化预投票投票箱
遍历所有的集群节点
如果遍历的节点是当前节点就跳过,如果遍历的节点因为宕机或者手动下线等原因连接不上也跳过
向遍历的节点发送一个RequestVoteRequest请求预投票给自己
最后因为自己也是集群节点的一员,所以自己也投票给自己
初始化预投票投票箱