ZooKeeper源码分析:Quorum请求的整个流程(8)


        //如果 zxid的低32位都是0, 则直接return
        if ((zxid & 0xffffffffL) == 0) {
            /*
            * We no longer process NEWLEADER ack by this method. However,
            * the learner sends ack back to the leader after it gets UPTODATE
            * so we just ignore the message.
            */
            return ;
        }
        //如果没有未完成的proposal, 则直接return
        if (outstandingProposals .size() == 0) {
            if (LOG .isDebugEnabled()) {
                LOG.debug( "outstanding is 0" );
            }
            return ;
        }
        //如果最近提交的proposal的 zxid比ack 的proposal的zxid大,说明 ack的proposal已经提交了, 则直接return
        if (lastCommitted >= zxid) {
            if (LOG .isDebugEnabled()) {
                LOG.debug( "proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
                        Long. toHexString( lastCommitted), Long.toHexString(zxid));
            }
            // The proposal has already been committed
            return ;
        }
        //根据 zxid取出proposal对象
        Proposal p = outstandingProposals .get(zxid);
        //如果在未完成列表outstandingProposal中没有找到 zxid对于的proposal, 则说明该 zxid对于的Proposal还没有处理。
        if (p == null) {
            LOG.warn( "Trying to commit future proposal: zxid 0x{} from {}",
                    Long. toHexString(zxid), followerAddr );
            return ;
        }
      //将发送 ack的Follower的sid放入Proposal.ackSet集合中
        p. ackSet.add(sid);
        if (LOG .isDebugEnabled()) {
            LOG.debug( "Count for zxid: 0x{} is {}" ,
                    Long. toHexString(zxid), p.ackSet.size());
        }
        //如果ackSet集合中已经包含了一个 Quorum
        if (self .getQuorumVerifier().containsQuorum(p.ackSet)){
            if (zxid != lastCommitted +1) {
                LOG.warn( "Commiting zxid 0x{} from {} not first!" ,
                        Long. toHexString(zxid), followerAddr );
                LOG.warn( "First is 0x{}" , Long.toHexString (lastCommitted + 1));
            }
            //从outstandingProposals中删除掉这个 zxid对于的proposal对象
            outstandingProposals.remove(zxid);
            //如果p.request不等于null, 则将这个proposal放入toBeApplied列表中
            if (p.request != null) {
                toBeApplied.add(p);
            }


            if (p.request == null) {
                LOG.warn( "Going to commmit null request for proposal: {}", p);
            }
            //发送Leader.COMMIT 包给所有的Follower
            commit(zxid);
            //通知所有的Observer
            inform(p);
            //调用处理器CommitProcessor的commit方法
            zk. commitProcessor.commit(p.request );
            //如果有sync等着等待这个commit的 zxid,发送Leader.SYNC数据包给对应的Follower
            if (pendingSyncs .containsKey(zxid)){
                for (LearnerSyncRequest r: pendingSyncs .remove(zxid)) {
                    sendSync(r);
                }
            }
        }
    }
【All Follower, Step 10】Follower.followLeader方法会循环读取从Leader的传输过来的Quorum数据包,并调用Follower.processPacket方法。该方法会根据数据的内容来分发。当发现是Leader.COMMIT类型的Quorum数据包,则会根据Quorum数据包的内容构造一个Request对象,并调用FollowerZooKeeperServer.commit方法。该方法最终会调用处理器CommitRequestProcessor.commit方法,将Request对象放到CommitRequestProcessor.committedRequests队列中。

FollowerZooKeeperServer.commit方法如下:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/bf8a6d0ea7831f3c07af4e6087dbcc6f.html