//提交请求,并等待返回结果
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
//如果r.getErr()不能0,则表示有错误,抛出异常
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code. get(r.getErr()),
clientPath);
}
return response.getStat();
}
【Follower A, Step 2,3】Follower A的NIOServerCnxn类接到了Client A的请求,会调用ZookeeperServer.processPacket方法。该方法会构建一个Request对象,并调用第一个处理器FollowerRequestProcessor的processRequest方法。该方法将Request对象放入FollowerRequestProcessor.queuedRequests队列中。FollowerRequestProcessor处理器线程会循环从FollowerRequestProcessor.queuedRequests队列中取出Request对象,并继续下面步骤:
1)调用下一个处理器CommitProcessor的processRequest方法。该方法将Request对象放入CommitProcessor.queuedRequests队列中;
2)通过Request.type判断Request类型。若发现是一个Quorum请求,会直接调用Learner.request(request)方法。该方法将Request对象封装成一个Leader.Request的Quorum数据包,并发送给Leader。
OpCode.sync操作也将调用Learner.request方法将请求转发给Leader,但在这之前会先将Request对象加入到pendingSyncs队列中。
FollowerRequestProcessor的run方法如下:
public void run() {
try {
while (!finished ) {
//从queuedRequests队列中取出Request对象
Request request = queuedRequests .take();
if (LOG .isTraceEnabled()) {
ZooTrace. logRequest( LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK ,
'F' , request, "" );
}
//当request是Request.requestOfDeath,一个poison pill, 就退出while循环,
//并结束FollowerRequestProcessor线程
if (request == Request.requestOfDeath) {
break ;
}
//我们在提交这个request到leader之前,把这个request传递到下一个处理器。
//这样我们就准备好从Leader那得到Response
nextProcessor.processRequest(request);
//只有Quorum操作和sync操作才会调用Follower.request方法, 转发Leader.REQUEST数据包给Leader
//sync操作和 Quorum操作有一些不同,
//我们需要保持跟踪这个sync操作对于的Follower已经挂起,所有我们将它加入pendingSyncs队列中。
switch (request.type ) {
case OpCode.sync:
//将OpCode.sync放入pendingSyncs队列中
zks.pendingSyncs .add(request);
zks.getFollower().request(request);
break ;
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
//Quorum请求,直接调用Folloer.request方法
zks.getFollower().request(request);
break ;
}
}
} catch (Exception e) {
LOG.error( "Unexpected exception causing exit" , e);
}
LOG.info( "FollowerRequestProcessor exited loop!" );
}
【Leader A, Step 4】Leader A的LearnerHandler线程会循环读取从Learner获得的Quorum数据包。如果数据包是Learner.REQUEST类型,则会解析Quorum数据包的内容,检查操作类型。
如果操作类型不是OpCode.sync, 则会构造Request对象。并调用ZooKeeperServer.submitRequest方法(和上面Follower接收到请求所使用的submitRequest方法是同一个方法),并最终会调用第一个处理器PrepRequestProcessor的submitRequest方法,将Request对象放入PrepRequestProcessor.submittedRequests队列中。
如果操作类型是OpCode.sync, 会构造Request类型的子类LearnerSyncRequest对象,并同样调用PrepRequestProcessor的submitRequest方法。
LearnerHandler.run方法中对Leader.REQUEST数据包的处理代码如下: