addReplicator里面主要是做了两件事:1. 将要加入的节点从failureReplicators集合里移除;2. 将要加入的节点放入到replicatorMap集合中去。
投票electSelf private void electSelf() { long oldTerm; try { LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm); //1. 如果当前节点不在集群里面则不进行选举 if (!this.conf.contains(this.serverId)) { LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf); return; } //2. 大概是因为要进行正式选举了,把预选举关掉 if (this.state == State.STATE_FOLLOWER) { LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm); this.electionTimer.stop(); } //3. 清空leader resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "A follower's leader_id is reset to NULL as it begins to request_vote.")); this.state = State.STATE_CANDIDATE; this.currTerm++; this.votedId = this.serverId.copy(); LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm); //4. 开始发起投票定时器,因为可能投票失败需要循环发起投票 this.voteTimer.start(); //5. 初始化投票箱 this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); oldTerm = this.currTerm; } finally { this.writeLock.unlock(); } final LogId lastLogId = this.logManager.getLastLogId(true); this.writeLock.lock(); try { // vote need defense ABA after unlock&writeLock if (oldTerm != this.currTerm) { LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm); return; } //6. 遍历所有节点 for (final PeerId peer : this.conf.listPeers()) { if (peer.equals(this.serverId)) { continue; } if (!this.rpcService.connect(peer.getEndpoint())) { LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint()); continue; } final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this); done.request = RequestVoteRequest.newBuilder() // .setPreVote(false) // It's not a pre-vote request. .setGroupId(this.groupId) // .setServerId(this.serverId.toString()) // .setPeerId(peer.toString()) // .setTerm(this.currTerm) // .setLastLogIndex(lastLogId.getIndex()) // .setLastLogTerm(lastLogId.getTerm()) // .build(); this.rpcService.requestVote(peer.getEndpoint(), done.request, done); } this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId); this.voteCtx.grant(this.serverId); if (this.voteCtx.isGranted()) { //7. 投票成功,那么就晋升为leader becomeLeader(); } } finally { this.writeLock.unlock(); } }不要看这个方法这么长,其实都是和前面预选举的方法preVote重复度很高的。方法太长,所以标了号,从上面号码来一步步讲解:
对当前的节点进行校验,如果当前节点不在集群里面则不进行选举
因为是Follower发起的选举,所以大概是因为要进行正式选举了,把预选举定时器关掉
清空leader再进行选举,注意这里会把votedId设置为当前节点,代表自己参选
开始发起投票定时器,因为可能投票失败需要循环发起投票,voteTimer里面会根据当前的CANDIDATE状态调用electSelf进行选举
调用init方法初始化投票箱,这里和prevVoteCtx是一样的
遍历所有节点,然后向其他集群节点发送RequestVoteRequest请求,这里也是和preVote一样的,请求是被RequestVoteRequestProcessor处理器处理的。
如果有超过半数以上的节点投票选中,那么就调用becomeLeader晋升为leader
我先来看看RequestVoteRequestProcessor怎么处理的选举:
在RequestVoteRequestProcessor的processRequest0会调用NodeImpl的handleRequestVoteRequest来处理具体的逻辑。
NodeImpl#handleRequestVoteRequest
public Message handleRequestVoteRequest(final RequestVoteRequest request) { boolean doUnlock = true; this.writeLock.lock(); try { //是否存活 if (!this.state.isActive()) { LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); } final PeerId candidateId = new PeerId(); if (!candidateId.parse(request.getServerId())) { LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId()); return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.", request.getServerId()); } // noinspection ConstantConditions do { // check term if (request.getTerm() >= this.currTerm) { LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); //1. 如果请求的任期大于当前任期 // increase current term, change state to follower if (request.getTerm() > this.currTerm) { stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term RequestVoteRequest.")); } } else { // ignore older term LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); break; } doUnlock = false; this.writeLock.unlock(); final LogId lastLogId = this.logManager.getLastLogId(true); doUnlock = true; this.writeLock.lock(); // vote need ABA check after unlock&writeLock if (request.getTerm() != this.currTerm) { LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm); break; } //2. 判断日志完整度 final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm()) .compareTo(lastLogId) >= 0; //3. 判断当前的节点是不是已经投过票了 if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) { stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE, "Raft node votes for some candidate, step down to restart election_timer.")); this.votedId = candidateId.copy(); this.metaStorage.setVotedFor(candidateId); } } while (false); return RequestVoteResponse.newBuilder() // .setTerm(this.currTerm) // //4.同意投票的条件是当前的任期和请求的任期一样,并且已经将votedId设置为请求节点 .setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) // .build(); } finally { if (doUnlock) { this.writeLock.unlock(); } } }这个方法大致也和handlePreVoteRequest差不多。我这里只分析一下我标注的。
这里是判断当前的任期是小于请求的任期的,并且调用stepDown将请求任期设置为当前的任期,将当前的状态设置被Follower
作为一个leader来做日志肯定是要比被请求的节点完整,所以这里判断一下日志是不是比被请求的节点日志完整
如果日志是完整的,并且被请求的节点没有投票给其他的候选人,那么就将votedId设置为当前请求的节点
给请求发送响应,同意投票的条件是当前的任期和请求的任期一样,并且已经将votedId设置为请求节点
晋升leader