public void processRequest(Request request) {
if (LOG .isDebugEnabled()) {
LOG.debug( "Processing request:: " + request);
}
// request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace. SERVER_PING_TRACE_MASK;
}
if (LOG .isTraceEnabled()) {
ZooTrace. logRequest( LOG, traceMask, 'E' , request, "" );
}
ProcessTxnResult rc = null ;
synchronized (zks.outstandingChanges ) {
//循环从outstandingChanges中取出小于等于request.zxid的ChangeRecord,并删除
while (!zks .outstandingChanges .isEmpty()
&& zks.outstandingChanges .get(0).zxid <= request.zxid) {
ChangeRecord cr = zks.outstandingChanges .remove(0);
if (cr.zxid < request.zxid) {
LOG.warn( "Zxid outstanding "
+ cr. zxid
+ " is less than current " + request.zxid );
}
if (zks .outstandingChangesForPath .get(cr.path) == cr) {
zks.outstandingChangesForPath .remove(cr.path);
}
}
//如果request.hdr不等于null, 则在内存 Datatree中处理这个请求
if (request.hdr != null) {
TxnHeader hdr = request. hdr;
Record txn = request. txn;
rc = zks.processTxn(hdr, txn);
}
//检测这个request的类型是否是需要 Quorum Ack 的requrest
//如果是,加入到committedProposal中
if (Request. isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
if (request.hdr != null && request.hdr.getType() == OpCode.closeSession ) {
ServerCnxnFactory scxn = zks.getServerCnxnFactory();
if (scxn != null && request.cnxn == null ) {
scxn.closeSession(request. sessionId);
return ;
}
}
//如果request的 cnxn为null, 则直接return
if (request.cnxn == null) {
return ;
}
//下面是构造response
ServerCnxn cnxn = request. cnxn;
String lastOp = "NA" ;
zks.decInProcess();
Code err = Code . OK;
Record rsp = null;
boolean closeSession = false;
try {
if (request.hdr != null && request.hdr.getType() == OpCode.error) {
throw KeeperException.create( KeeperException.Code. get( (
(ErrorTxn) request. txn) .getErr()));
}
KeeperException ke = request.getException();
if (ke != null && request.type != OpCode. multi) {
throw ke;
}
if (LOG .isDebugEnabled()) {
LOG.debug( "{}" ,request);
}
switch (request.type ) {
......
case OpCode.setData: {
lastOp = "SETD" ;
//构建SetDataResponse
rsp = new SetDataResponse( rc.stat);
err = Code. get(rc .err);
break ;
}
......
} catch (SessionMovedException e) {
cnxn.sendCloseSession();
return ;
} catch (KeeperException e) {
//如果有KeeperException,则设置err
err = e.code();
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error( "Failed to process " + request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request. request;
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer. toHexString(bb.get() & 0xff));
}
LOG.error( "Dumping request buffer: 0x" + sb.toString());
err = Code. MARSHALLINGERROR;
}
//读取最后 zxid
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
ReplyHeader hdr =
new ReplyHeader(request. cxid, lastZxid, err.intValue());
zks.serverStats().updateLatency(request.createTime);
cnxn.updateStatsForResponse(request. cxid, lastZxid, lastOp,
request. createTime, System.currentTimeMillis());
try {
//发送Response给客户端
cnxn.sendResponse(hdr, rsp, "response" );
if (closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error( "FIXMSG" ,e);
}
}
--------------------------------------分割线 --------------------------------------