3. SOFAJRaft源码分析— 是如何进行选举的? (5)

Ballot#grant

public void grant(PeerId peerId) { this.grant(peerId, new PosHint()); } public PosHint grant(PeerId peerId, PosHint hint) { UnfoundPeerId peer = findPeer(peerId, peers, hint.pos0); if (peer != null) { if (!peer.found) { peer.found = true; this.quorum--; } hint.pos0 = peer.index; } else { hint.pos0 = -1; } .... return hint; }

grant方法会根据peerId去集群集合里面去找被封装的UnfoundPeerId实例,然后判断一下,如果没有被记录过,那么就将quorum减一,表示收到一票,然后将found设置为ture表示已经找过了。

在查找UnfoundPeerId实例的时候方法里面做了一个很有趣的设置:
首先在存入到peers集合里面的时候是这样的:

int index = 0; for (PeerId peer : conf) { this.peers.add(new UnfoundPeerId(peer, index++, false)); }

这里会遍历conf,然后会存入index,index从零开始。
然后在查找的时候会传入peerId和posHint还有peers集合:

private UnfoundPeerId findPeer(PeerId peerId, List<UnfoundPeerId> peers, int posHint) { if (posHint < 0 || posHint >= peers.size() || !peers.get(posHint).peerId.equals(peerId)) { for (UnfoundPeerId ufp : peers) { if (ufp.peerId.equals(peerId)) { return ufp; } } return null; } return peers.get(posHint); }

这里传入的posHint默认是-1 ,即如果是第一次传入,那么会遍历整个peers集合,然后一个个比对之后返回。

因为PosHint实例会在调用完之后将pos0设置为peer的index,如果grant方法不是第一次调用,那么在调用findPeer方法的时候就可以直接通过get方法获取,不用再遍历整个集合了。

这种写法也可以运用到平时的代码中去。

调用了grant方法之后会调用Ballot的isGranted判断一下是否达到了半数以上的响应。
Ballot#isGranted

public boolean isGranted() { return this.quorum <= 0 && oldQuorum <= 0; }

即判断一下投票箱里面的票数是不是被减到了0。如果返回是的话,那么就调用electSelf进行选举。

选举的方法暂时先不看,我们来看看预选举的请求是怎么被处理的

响应RequestVoteRequest请求

RequestVoteRequest请求的处理器是在RaftRpcServerFactory的addRaftRequestProcessors方法中被安置的,具体的处理时RequestVoteRequestProcessor。

具体的处理方法是交由processRequest0方法来处理的。

RequestVoteRequestProcessor#processRequest0

public Message processRequest0(RaftServerService service, RequestVoteRequest request, RpcRequestClosure done) { //如果是预选举 if (request.getPreVote()) { return service.handlePreVoteRequest(request); } else { return service.handleRequestVoteRequest(request); } }

因为这个处理器可以处理选举和预选举,所以加了个判断。预选举的方法交给NodeImpl的handlePreVoteRequest来具体实现的。

NodeImpl#handlePreVoteRequest

public Message handlePreVoteRequest(final RequestVoteRequest request) { boolean doUnlock = true; this.writeLock.lock(); try { //校验这个节点是不是正常的节点 if (!this.state.isActive()) { LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); } final PeerId candidateId = new PeerId(); //发送过来的request请求携带的ServerId格式不能错 if (!candidateId.parse(request.getServerId())) { LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId()); return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.", request.getServerId()); } boolean granted = false; // noinspection ConstantConditions do { //已经有leader的情况 if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) { LOG.info( "Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId); break; } //请求的任期小于当前的任期 if (request.getTerm() < this.currTerm) { LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); // A follower replicator may not be started when this node become leader, so we must check it. //那么这个节点也可能是leader,所以校验一下请求的节点是不是复制节点,重新加入到replicatorGroup中 checkReplicator(candidateId); break; } else if (request.getTerm() == this.currTerm + 1) { // A follower replicator may not be started when this node become leader, so we must check it. // check replicator state //因为请求的任期和当前的任期相等,那么这个节点也可能是leader, // 所以校验一下请求的节点是不是复制节点,重新加入到replicatorGroup中 checkReplicator(candidateId); } doUnlock = false; this.writeLock.unlock(); //获取最新的日志 final LogId lastLogId = this.logManager.getLastLogId(true); doUnlock = true; this.writeLock.lock(); final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm()); //比较当前节点的日志完整度和请求节点的日志完整度 granted = requestLastLogId.compareTo(lastLogId) >= 0; LOG.info( "Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId, lastLogId); } while (false);//这个while蛮有意思,为了用break想尽了办法 return RequestVoteResponse.newBuilder() // .setTerm(this.currTerm) // .setGranted(granted) // .build(); } finally { if (doUnlock) { this.writeLock.unlock(); } } }

这个方法里面也是蛮有意思的,写的很长,但是逻辑很清楚。

首先调用isActive,看一下当前节点是不是正常的节点,不是正常节点要返回Error信息

将请求传过来的ServerId解析到candidateId实例中

校验当前的节点如果有leader,并且leader有效的,那么就直接break,返回granted为false

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypgxg.html