3. SOFAJRaft源码分析— 是如何进行选举的? (6)

如果当前的任期大于请求的任期,那么调用checkReplicator检查自己是不是leader,如果是leader,那么将当前节点从failureReplicators移除,重新加入到replicatorMap中。然后直接break

请求任期和当前任期相等的情况也要校验,只是不用break

如果请求的日志比当前的最新的日志还要新,那么返回granted为true,代表授权成功

这里有一个有意思的地方是,因为java中只能在循环中goto,所以这里使用了do-while(false)只做单次的循环,这样就可以do代码块里使用break了。

下面稍微看一下checkReplicator:
NodeImpl#checkReplicator

private void checkReplicator(final PeerId candidateId) { if (this.state == State.STATE_LEADER) { this.replicatorGroup.checkReplicator(candidateId, false); } }

这里判断一下是不是leader,然后就会调用ReplicatorGroupImpl的checkReplicator

ReplicatorGroupImpl#checkReplicator

private final ConcurrentMap<PeerId, ThreadId> replicatorMap = new ConcurrentHashMap<>(); private final Set<PeerId> failureReplicators = new ConcurrentHashSet<>(); public void checkReplicator(final PeerId peer, final boolean lockNode) { //根据传入的peer获取相应的ThreadId final ThreadId rid = this.replicatorMap.get(peer); // noinspection StatementWithEmptyBody if (rid == null) { // Create replicator if it's not found for leader. final NodeImpl node = this.commonOptions.getNode(); if (lockNode) { node.writeLock.lock(); } try { //如果当前的节点是leader,并且传入的peer在failureReplicators中,那么重新添加到replicatorMap if (node.isLeader() && this.failureReplicators.contains(peer) && addReplicator(peer)) { this.failureReplicators.remove(peer); } } finally { if (lockNode) { node.writeLock.unlock(); } } } else { // NOPMD // Unblock it right now. // Replicator.unBlockAndSendNow(rid); } }

checkReplicator会从replicatorMap根据传入的peer实例校验一下是不是为空。因为replicatorMap里面存放了集群所有的节点。然后通过ReplicatorGroupImpl的commonOptions获取当前的Node实例,如果当前的node实例是leader,并且如果存在失败集合failureReplicators中的话就重新添加进replicatorMap中。

ReplicatorGroupImpl#addReplicator

public boolean addReplicator(final PeerId peer) { //校验当前的任期 Requires.requireTrue(this.commonOptions.getTerm() != 0); //如果replicatorMap里面已经有这个节点了,那么将它从failureReplicators集合中移除 if (this.replicatorMap.containsKey(peer)) { this.failureReplicators.remove(peer); return true; } //赋值一个新的ReplicatorOptions final ReplicatorOptions opts = this.commonOptions == null ? new ReplicatorOptions() : this.commonOptions.copy(); //新的ReplicatorOptions添加这个PeerId opts.setPeerId(peer); final ThreadId rid = Replicator.start(opts, this.raftOptions); if (rid == null) { LOG.error("Fail to start replicator to peer={}.", peer); this.failureReplicators.add(peer); return false; } return this.replicatorMap.put(peer, rid) == null; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypgxg.html