ZooKeeper源码分析:Quorum请求的整个流程(7)

public void processRequest(Request si) {
        if (si.type != OpCode.sync){
            //构建Leader.ACK Quorum包
            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null ,
                null );
            try {
                //将Leader.ACK Quorum数据包发送给Leader
                learner.writePacket(qp, false);
            } catch (IOException e) {
                LOG.warn( "Closing connection to leader, exception during packet send", e);
                try {
                    if (!learner .sock .isClosed()) {
                        learner.sock .close();
                    }
                } catch (IOException e1) {
                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
                    LOG.debug( "Ignoring error closing the connection" , e1);
                }
            }
        }
    }
【Leader A, Step 9】LearnerHandler线程循环读取从Learner那获得的Quorum数据包。当发现是从Follower传输过来的Leader.ACK类型数据包,则会调用Leader.processAck方法进行处理。在Leader.processAck方法中,若已经有一个Follower Quorom发送了Leader.ACK数据包,则会执行下列三步骤:

1)调用Leader.commit方法,发送Leader.COMMIT类型Quorum数据包给所有 Follower;

2)调用Leader.inform 方法,通知所有的Observer;

3)调用处理器CommitRequestProcessor.commit 方法,将Request对象放到CommitRequestProcessor.committedRequests队列中。(【Leader A, Step 10(1)-1,10(1)-2】CommitProcessor线程会从CommitRequestProcessor.committedRequests队列中取出提交的Request对象,发现是和nextPending是一致的,然后提交的Request对象内容替换nextPending的内容,并将nextPending放入到toProcess队列中。下一次循环会从toProcess队列中取出nextPending,然后调用下一个处理器Leader.ToBeAppliedRequestProcessor的processRequest方法。该方法会调用下一个处理器FinalRequestProcessor的processRequest方法。FinalRequestProcessor.processRequest方法并根据Request对象中的操作更新内存中Session信息或者znode数据。)

Leader的processAck方法如下:

/**
    * 保存某个proposal接收到的Ack数量
    *
    * @param zxid
    *                被发送的proposal的zxid
    * @param followerAddr
    */
    synchronized public void processAck( long sid, long zxid, SocketAddress followerAddr) {
        if (LOG .isTraceEnabled()) {
            LOG.trace( "Ack zxid: 0x{}" , Long.toHexString (zxid));
            for (Proposal p : outstandingProposals .values()) {
                long packetZxid = p.packet.getZxid();
                LOG.trace( "outstanding proposal: 0x{}" ,
                        Long. toHexString(packetZxid));
            }
            LOG.trace( "outstanding proposals all" );
        }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/bf8a6d0ea7831f3c07af4e6087dbcc6f.html