ZooKeeper源码分析:Quorum请求的整个流程(2)


        //提交请求,并等待返回结果
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        //如果r.getErr()不能0,则表示有错误,抛出异常
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code. get(r.getErr()),
                    clientPath);
        }
        return response.getStat();
    }
【Follower A, Step 2,3】Follower A的NIOServerCnxn类接到了Client A的请求,会调用ZookeeperServer.processPacket方法。该方法会构建一个Request对象,并调用第一个处理器FollowerRequestProcessor的processRequest方法。该方法将Request对象放入FollowerRequestProcessor.queuedRequests队列中。FollowerRequestProcessor处理器线程会循环从FollowerRequestProcessor.queuedRequests队列中取出Request对象,并继续下面步骤:

1)调用下一个处理器CommitProcessor的processRequest方法。该方法将Request对象放入CommitProcessor.queuedRequests队列中;

2)通过Request.type判断Request类型。若发现是一个Quorum请求,会直接调用Learner.request(request)方法。该方法将Request对象封装成一个Leader.Request的Quorum数据包,并发送给Leader。

OpCode.sync操作也将调用Learner.request方法将请求转发给Leader,但在这之前会先将Request对象加入到pendingSyncs队列中。

FollowerRequestProcessor的run方法如下:

public void run() {
        try {
            while (!finished ) {
                //从queuedRequests队列中取出Request对象
                Request request = queuedRequests .take();
                if (LOG .isTraceEnabled()) {
                    ZooTrace. logRequest( LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK ,
                            'F' , request, "" );
                }
                //当request是Request.requestOfDeath,一个poison pill, 就退出while循环,
                //并结束FollowerRequestProcessor线程
                if (request == Request.requestOfDeath) {
                    break ;
                }


                //我们在提交这个request到leader之前,把这个request传递到下一个处理器。
                //这样我们就准备好从Leader那得到Response
                nextProcessor.processRequest(request);


                //只有Quorum操作和sync操作才会调用Follower.request方法, 转发Leader.REQUEST数据包给Leader
                //sync操作和 Quorum操作有一些不同,
                //我们需要保持跟踪这个sync操作对于的Follower已经挂起,所有我们将它加入pendingSyncs队列中。
                switch (request.type ) {
                case OpCode.sync:
                    //将OpCode.sync放入pendingSyncs队列中
                    zks.pendingSyncs .add(request);
                    zks.getFollower().request(request);
                    break ;
                case OpCode.create:
                case OpCode.delete:
                case OpCode.setData:
                case OpCode.setACL:
                case OpCode.createSession:
                case OpCode.closeSession:
                case OpCode.multi:
                    //Quorum请求,直接调用Folloer.request方法
                    zks.getFollower().request(request);
                    break ;
                }
            }
        } catch (Exception e) {
            LOG.error( "Unexpected exception causing exit" , e);
        }
        LOG.info( "FollowerRequestProcessor exited loop!" );
    }
【Leader A, Step 4】Leader A的LearnerHandler线程会循环读取从Learner获得的Quorum数据包。如果数据包是Learner.REQUEST类型,则会解析Quorum数据包的内容,检查操作类型。
如果操作类型不是OpCode.sync, 则会构造Request对象。并调用ZooKeeperServer.submitRequest方法(和上面Follower接收到请求所使用的submitRequest方法是同一个方法),并最终会调用第一个处理器PrepRequestProcessor的submitRequest方法,将Request对象放入PrepRequestProcessor.submittedRequests队列中。

如果操作类型是OpCode.sync, 会构造Request类型的子类LearnerSyncRequest对象,并同样调用PrepRequestProcessor的submitRequest方法。

LearnerHandler.run方法中对Leader.REQUEST数据包的处理代码如下:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/bf8a6d0ea7831f3c07af4e6087dbcc6f.html