ZooKeeper源码分析:Quorum请求的整个流程(4)

protected void pRequest2Txn( int type, long zxid, Request request, Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException
    {
        request.hdr = new TxnHeader(request.sessionId , request.cxid, zxid,
                                    zks.getTime(), type);


        switch (type) {
            .....
            case OpCode.setData:
                //检查session
                zks.sessionTracker .checkSession(request.sessionId, request.getOwner());
                //将record转成SetDataRequest类型
                SetDataRequest setDataRequest = ( SetDataRequest)record;
                if (deserialize)
                    //将Request.reques数据反序列化成setDataRequest对象
                    ByteBufferInputStream.byteBuffer2Record(request. request, setDataRequest);
                //获取需要需要修改的znode的path
                path = setDataRequest.getPath();
                //获取内存数据中获取path对于的znode信息
                nodeRecord = getRecordForPath( path);
                //检查对 znode是否有写权限
                checkACL( zks, nodeRecord .acl , ZooDefs.Perms.WRITE,
                        request.authInfo);
                //获取客户端设置的版本号
                version = setDataRequest.getVersion();
                //获取节点当前版本号
                int currentVersion = nodeRecord.stat.getVersion();
                //如果客户端设置的版本号不是-1,且不等于当前版本号,则抛出KeeperException.BadVersionException异常
                if (version != -1 && version != currentVersion) {
                    throw new KeeperException .BadVersionException(path);
                }
                //version等于当前版本加1
                version = currentVersion + 1;
                //构建SetDataTxn对象,并赋给request.txn
                request. txn = new SetDataTxn( path, setDataRequest.getData(), version);
                //拷贝nodeRecord
                nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
                //将nodeRecord的当前版本号设置为version
                nodeRecord.stat.setVersion( version);
                //将nodeRecord放入outstandingChanges
                //path和nodeRecord map放入outstandingChangesForPath
                addChangeRecord( nodeRecord);
                break ;
          ......
        }
    }
【Leader A, Step 5,6】处理器ProposalRequestProcessor会先判断Request对象是否是LearnerSyncRequest类型。

如果不是LearnerSyncRequest类型(也就是Quorum请求),会按如下步骤执行:

1)调用下一个处理器CommitProcessor的processRequest方法,将Request对象放入CommitProcessor.queuedRequests队列中;

2)将proposal发送到所有的Follower;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/bf8a6d0ea7831f3c07af4e6087dbcc6f.html