ZooKeeper源码分析:Quorum请求的整个流程(6)


            //获得packet的 zxid, 并放入outstandingProposals 未完成Proposal Map中
            lastProposed = p.packet.getZxid();
            //将p加入到outstandingProposals Map中
            outstandingProposals.put( lastProposed , p);
            //发送给所有的Follower
            sendPacket(pp);
        }
        return p;
    }   
Follower.processPacket方法如下:

/**
    * 检查在qp中接收到的packet, 并根据它的内容进行分发
    * @param qp
    * @throws IOException
    */
    protected void processPacket(QuorumPacket qp) throws IOException{
        switch (qp.getType()) {
        case Leader.PING:
            ping(qp);
            break ;
        case Leader.PROPOSAL:
            TxnHeader hdr = new TxnHeader();
            //从数据包 qp中反序列化出 txn
            Record txn = SerializeUtils . deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn( "Got zxid 0x"
                        + Long. toHexString(hdr.getZxid())
                        + " expected 0x"
                        + Long. toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            fzk.logRequest(hdr, txn);
            break ;
        case Leader.COMMIT:
            fzk.commit(qp.getZxid());
            break ;
        case Leader.UPTODATE:
            LOG.error( "Received an UPTODATE message after Follower started");
            break ;
        case Leader.REVALIDATE:
            revalidate(qp);
            break ;
        case Leader.SYNC:
            fzk.sync();
            break ;
        }
        }   
FollowerZooKeeperServer的logRequest方法如下:

public void logRequest(TxnHeader hdr, Record txn) {
        //构建Request对象
        Request request = new Request( null, hdr.getClientId(), hdr.getCxid(),
                hdr.getType(), null , null );
        request.hdr = hdr;
        request.txn = txn;
        request.zxid = hdr.getZxid();
        //如果request.zxid的低32为不全为0, 则加入pendingTxns队列中
        if ((request.zxid & 0xffffffffL) != 0) {
            pendingTxns.add(request);
        }
        //调用SyncRequestProcessor处理这个request
        syncProcessor.processRequest(request);
        } 
【All Followers, Step 8】处理器SyncRequestProcessor的功能和Leader的SyncRequestProcessor一样,将请求记录到日志中,然后将Request请求传递给下一个处理器。不过Follower的下一个处理器是SendAckRequestProcessor。该处理器会构建一个Leader.ACK的Quorum数据包,并发送给Leader。

SendAckRequestProcessor的processRequest方法如下:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/bf8a6d0ea7831f3c07af4e6087dbcc6f.html