ZooKeeper源码分析:Quorum请求的整个流程(9)

/**
    *当接收到一个COMMIT消息,这个方法会被调用。该方法会将COMMIT消息
    *中的zxid和pendingTxns队列中的第一个对象的zxid进行匹配。如何相同,则
    *传递给处理器CommitProcessor进行commit
    * @param zxid - must correspond to the head of pendingTxns if it exists
    */
    public void commit( long zxid ) {
        if (pendingTxns .size() == 0) {
            LOG.warn( "Committing " + Long. toHexString (zxid)
                    + " without seeing txn" );
            return ;
        }
        //取��pendingTxns第一个元素的 zxid
        long firstElementZxid = pendingTxns .element().zxid;
        //如果第一个元素的 zxid不等于COMMIT消息中的 zxid, 则退出程序
        if (firstElementZxid != zxid) {
            LOG.error( "Committing zxid 0x" + Long. toHexString (zxid)
                    + " but next pending txn 0x"
                    + Long. toHexString(firstElementZxid));
            System. exit(12);
        }
        //pendingTxns取出,并删除第一个元素
        Request request = pendingTxns .remove();
        //将从pendingTxns队列中取出的第一个 reqeust对象传递给CommitProcessor处理器进行commit
        commitProcessor.commit(request);
    }
【All Follower, Step 11】处理器CommitProcessor线程会处理提交的Request对象。

如果是Follower A, nextPending对象是和提交Request对象是一致的,所以将提交Request对象内容替换nextPending中的内容,并放入toProcess队列中。在下一个循环会从toProcess队列中取出并传递到下一个迭代器FinalRequestProcessor中。(和Leader中的CommitProcessor线程处理逻辑是一样的)

如果不是Follower A, 则可能有下面两种情况:

1)queuedRequest队列为empty且nextPending为null, 也就是这个Follower没有自己转发的request正在处理;

2)nextPending不为null, 也就是有转发的request正在处理。但nextPending对象一定和提交的Request对象是不一致的。

不管是哪一种,都会直接将提交的Request对象加入到toProcess队列中。处理器CommitProcessor线程会从中取出并传递到下一个迭代器FinalRequestProcessor中。

CommitProcessor.run方法如下:

public void run() {
        try {
            Request nextPending = null;
            while (!finished ) {
                int len = toProcess .size();
                for (int i = 0; i < len; i++) {
                    nextProcessor.processRequest( toProcess .get(i));
                }
                //当将所有的request传递到下一个处理器FinalRequestProcessor后,将toProcess清空
                toProcess.clear();
                synchronized (this ) {
                    //如果queuedRequests队列为空,或者nextPending为null, 或者committedRequest队列为控股,则等待。
                    if ((queuedRequests .size() == 0 || nextPending != null )
                            && committedRequests.size() == 0) {
                        wait();
                        continue ;
                    }
                    //第一步,检查这个commit是否为了pending request而来
                    //如果commit request到来,但是queuedRequests为空,或者nextPending为null
                    if ((queuedRequests .size() == 0 || nextPending != null )
                            && committedRequests.size() > 0) {
                        Request r = committedRequests .remove();
                        /*
                        * We match with nextPending so that we can move to the
                        * next request when it is committed. We also want to
                        * use nextPending because it has the cnxn member set
                        * properly.
                        */
                        //如果nextPending不等于null,
                        if (nextPending != null
                                && nextPending. sessionId == r.sessionId
                                && nextPending. cxid == r.cxid ) {
                            // we want to send our version of the request.
                            // the pointer to the connection in the request
                            nextPending.hdr = r. hdr;
                            nextPending. txn = r.txn ;
                            nextPending. zxid = r.zxid ;
                            toProcess.add(nextPending);
                            nextPending = null ;
                        } else {
                            // this request came from someone else so just
                            // send the commit packet
                          //如果这个请求来自于其他人,则直接加入到toProcess中
                          //sync请求,或者不是Follower发起的请求
                            toProcess.add(r);
                        }
                    }
                }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/bf8a6d0ea7831f3c07af4e6087dbcc6f.html