ZooKeeper源码分析:Quorum请求的整个流程(11)

public void processRequest(Request request) {
        if (LOG .isDebugEnabled()) {
            LOG.debug( "Processing request:: " + request);
        }
        // request.addRQRec(">final");
        long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
        if (request.type == OpCode.ping) {
            traceMask = ZooTrace. SERVER_PING_TRACE_MASK;
        }
        if (LOG .isTraceEnabled()) {
            ZooTrace. logRequest( LOG, traceMask, 'E' , request, "" );
        }
        ProcessTxnResult rc = null ;
        synchronized (zks.outstandingChanges ) {
          //循环从outstandingChanges中取出小于等于request.zxid的ChangeRecord,并删除
            while (!zks .outstandingChanges .isEmpty()
                    && zks.outstandingChanges .get(0).zxid <= request.zxid) {
                ChangeRecord cr = zks.outstandingChanges .remove(0);
                if (cr.zxid < request.zxid) {
                    LOG.warn( "Zxid outstanding "
                            + cr. zxid
                            + " is less than current " + request.zxid );
                }
                if (zks .outstandingChangesForPath .get(cr.path) == cr) {
                    zks.outstandingChangesForPath .remove(cr.path);
                }
            }
 
            //如果request.hdr不等于null, 则在内存 Datatree中处理这个请求
            if (request.hdr != null) {
              TxnHeader hdr = request. hdr;
              Record txn = request. txn;
 
              rc = zks.processTxn(hdr, txn);
            }
            //检测这个request的类型是否是需要 Quorum Ack 的requrest
            //如果是,加入到committedProposal中
            if (Request. isQuorum(request.type)) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }
 
        if (request.hdr != null && request.hdr.getType() == OpCode.closeSession ) {
            ServerCnxnFactory scxn = zks.getServerCnxnFactory();
            if (scxn != null && request.cnxn == null ) {
                scxn.closeSession(request. sessionId);
                return ;
            }
        }
        //如果request的 cnxn为null, 则直接return
        if (request.cnxn == null) {
            return ;
        }
 
        //下面是构造response
        ServerCnxn cnxn = request. cnxn;
 
        String lastOp = "NA" ;
        zks.decInProcess();
        Code err = Code . OK;
        Record rsp = null;
        boolean closeSession = false;
        try {
            if (request.hdr != null && request.hdr.getType() == OpCode.error) {
                throw KeeperException.create( KeeperException.Code. get( (
                        (ErrorTxn) request. txn) .getErr()));
            }
 
            KeeperException ke = request.getException();
            if (ke != null && request.type != OpCode. multi) {
                throw ke;
            }
 
            if (LOG .isDebugEnabled()) {
                LOG.debug( "{}" ,request);
            }
            switch (request.type ) {
            ......
            case OpCode.setData: {
                lastOp = "SETD" ;
                //构建SetDataResponse
                rsp = new SetDataResponse( rc.stat);
                err = Code. get(rc .err);
                break ;
            }
            ......
        } catch (SessionMovedException e) {
            cnxn.sendCloseSession();
            return ;
        } catch (KeeperException e) {
            //如果有KeeperException,则设置err
            err = e.code();
        } catch (Exception e) {
            // log at error level as we are returning a marshalling
            // error to the user
            LOG.error( "Failed to process " + request, e);
            StringBuilder sb = new StringBuilder();
            ByteBuffer bb = request. request;
            bb.rewind();
            while (bb.hasRemaining()) {
                sb.append(Integer. toHexString(bb.get() & 0xff));
            }
            LOG.error( "Dumping request buffer: 0x" + sb.toString());
            err = Code. MARSHALLINGERROR;
        }
        //读取最后 zxid
        long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
        ReplyHeader hdr =
            new ReplyHeader(request. cxid, lastZxid, err.intValue());
 
        zks.serverStats().updateLatency(request.createTime);
        cnxn.updateStatsForResponse(request. cxid, lastZxid, lastOp,
                    request. createTime, System.currentTimeMillis());
 
        try {
            //发送Response给客户端
            cnxn.sendResponse(hdr, rsp, "response" );
            if (closeSession) {
                cnxn.sendCloseSession();
            }
        } catch (IOException e) {
            LOG.error( "FIXMSG" ,e);
        }
    } 

--------------------------------------分割线 --------------------------------------

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/bf8a6d0ea7831f3c07af4e6087dbcc6f.html