Ballot#grant
public void grant(PeerId peerId) { this.grant(peerId, new PosHint()); } public PosHint grant(PeerId peerId, PosHint hint) { UnfoundPeerId peer = findPeer(peerId, peers, hint.pos0); if (peer != null) { if (!peer.found) { peer.found = true; this.quorum--; } hint.pos0 = peer.index; } else { hint.pos0 = -1; } .... return hint; }grant方法会根据peerId去集群集合里面去找被封装的UnfoundPeerId实例,然后判断一下,如果没有被记录过,那么就将quorum减一,表示收到一票,然后将found设置为ture表示已经找过了。
在查找UnfoundPeerId实例的时候方法里面做了一个很有趣的设置:
首先在存入到peers集合里面的时候是这样的:
这里会遍历conf,然后会存入index,index从零开始。
然后在查找的时候会传入peerId和posHint还有peers集合:
这里传入的posHint默认是-1 ,即如果是第一次传入,那么会遍历整个peers集合,然后一个个比对之后返回。
因为PosHint实例会在调用完之后将pos0设置为peer的index,如果grant方法不是第一次调用,那么在调用findPeer方法的时候就可以直接通过get方法获取,不用再遍历整个集合了。
这种写法也可以运用到平时的代码中去。
调用了grant方法之后会调用Ballot的isGranted判断一下是否达到了半数以上的响应。
Ballot#isGranted
即判断一下投票箱里面的票数是不是被减到了0。如果返回是的话,那么就调用electSelf进行选举。
选举的方法暂时先不看,我们来看看预选举的请求是怎么被处理的
响应RequestVoteRequest请求RequestVoteRequest请求的处理器是在RaftRpcServerFactory的addRaftRequestProcessors方法中被安置的,具体的处理时RequestVoteRequestProcessor。
具体的处理方法是交由processRequest0方法来处理的。
RequestVoteRequestProcessor#processRequest0
public Message processRequest0(RaftServerService service, RequestVoteRequest request, RpcRequestClosure done) { //如果是预选举 if (request.getPreVote()) { return service.handlePreVoteRequest(request); } else { return service.handleRequestVoteRequest(request); } }因为这个处理器可以处理选举和预选举,所以加了个判断。预选举的方法交给NodeImpl的handlePreVoteRequest来具体实现的。
NodeImpl#handlePreVoteRequest
public Message handlePreVoteRequest(final RequestVoteRequest request) { boolean doUnlock = true; this.writeLock.lock(); try { //校验这个节点是不是正常的节点 if (!this.state.isActive()) { LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); } final PeerId candidateId = new PeerId(); //发送过来的request请求携带的ServerId格式不能错 if (!candidateId.parse(request.getServerId())) { LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId()); return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.", request.getServerId()); } boolean granted = false; // noinspection ConstantConditions do { //已经有leader的情况 if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) { LOG.info( "Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId); break; } //请求的任期小于当前的任期 if (request.getTerm() < this.currTerm) { LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); // A follower replicator may not be started when this node become leader, so we must check it. //那么这个节点也可能是leader,所以校验一下请求的节点是不是复制节点,重新加入到replicatorGroup中 checkReplicator(candidateId); break; } else if (request.getTerm() == this.currTerm + 1) { // A follower replicator may not be started when this node become leader, so we must check it. // check replicator state //因为请求的任期和当前的任期相等,那么这个节点也可能是leader, // 所以校验一下请求的节点是不是复制节点,重新加入到replicatorGroup中 checkReplicator(candidateId); } doUnlock = false; this.writeLock.unlock(); //获取最新的日志 final LogId lastLogId = this.logManager.getLastLogId(true); doUnlock = true; this.writeLock.lock(); final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm()); //比较当前节点的日志完整度和请求节点的日志完整度 granted = requestLastLogId.compareTo(lastLogId) >= 0; LOG.info( "Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId, lastLogId); } while (false);//这个while蛮有意思,为了用break想尽了办法 return RequestVoteResponse.newBuilder() // .setTerm(this.currTerm) // .setGranted(granted) // .build(); } finally { if (doUnlock) { this.writeLock.unlock(); } } }这个方法里面也是蛮有意思的,写的很长,但是逻辑很清楚。
首先调用isActive,看一下当前节点是不是正常的节点,不是正常节点要返回Error信息
将请求传过来的ServerId解析到candidateId实例中
校验当前的节点如果有leader,并且leader有效的,那么就直接break,返回granted为false