public void processRequest(Request si) {
if (si.type != OpCode.sync){
//构建Leader.ACK Quorum包
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null ,
null );
try {
//将Leader.ACK Quorum数据包发送给Leader
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn( "Closing connection to leader, exception during packet send", e);
try {
if (!learner .sock .isClosed()) {
learner.sock .close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug( "Ignoring error closing the connection" , e1);
}
}
}
}
【Leader A, Step 9】LearnerHandler线程循环读取从Learner那获得的Quorum数据包。当发现是从Follower传输过来的Leader.ACK类型数据包,则会调用Leader.processAck方法进行处理。在Leader.processAck方法中,若已经有一个Follower Quorom发送了Leader.ACK数据包,则会执行下列三步骤:
1)调用Leader.commit方法,发送Leader.COMMIT类型Quorum数据包给所有 Follower;
2)调用Leader.inform 方法,通知所有的Observer;
3)调用处理器CommitRequestProcessor.commit 方法,将Request对象放到CommitRequestProcessor.committedRequests队列中。(【Leader A, Step 10(1)-1,10(1)-2】CommitProcessor线程会从CommitRequestProcessor.committedRequests队列中取出提交的Request对象,发现是和nextPending是一致的,然后提交的Request对象内容替换nextPending的内容,并将nextPending放入到toProcess队列中。下一次循环会从toProcess队列中取出nextPending,然后调用下一个处理器Leader.ToBeAppliedRequestProcessor的processRequest方法。该方法会调用下一个处理器FinalRequestProcessor的processRequest方法。FinalRequestProcessor.processRequest方法并根据Request对象中的操作更新内存中Session信息或者znode数据。)
Leader的processAck方法如下:
/**
* 保存某个proposal接收到的Ack数量
*
* @param zxid
* 被发送的proposal的zxid
* @param followerAddr
*/
synchronized public void processAck( long sid, long zxid, SocketAddress followerAddr) {
if (LOG .isTraceEnabled()) {
LOG.trace( "Ack zxid: 0x{}" , Long.toHexString (zxid));
for (Proposal p : outstandingProposals .values()) {
long packetZxid = p.packet.getZxid();
LOG.trace( "outstanding proposal: 0x{}" ,
Long. toHexString(packetZxid));
}
LOG.trace( "outstanding proposals all" );
}