//获得packet的 zxid, 并放入outstandingProposals 未完成Proposal Map中
lastProposed = p.packet.getZxid();
//将p加入到outstandingProposals Map中
outstandingProposals.put( lastProposed , p);
//发送给所有的Follower
sendPacket(pp);
}
return p;
}
Follower.processPacket方法如下:
/**
* 检查在qp中接收到的packet, 并根据它的内容进行分发
* @param qp
* @throws IOException
*/
protected void processPacket(QuorumPacket qp) throws IOException{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break ;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
//从数据包 qp中反序列化出 txn
Record txn = SerializeUtils . deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn( "Got zxid 0x"
+ Long. toHexString(hdr.getZxid())
+ " expected 0x"
+ Long. toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
fzk.logRequest(hdr, txn);
break ;
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break ;
case Leader.UPTODATE:
LOG.error( "Received an UPTODATE message after Follower started");
break ;
case Leader.REVALIDATE:
revalidate(qp);
break ;
case Leader.SYNC:
fzk.sync();
break ;
}
}
FollowerZooKeeperServer的logRequest方法如下:
public void logRequest(TxnHeader hdr, Record txn) {
//构建Request对象
Request request = new Request( null, hdr.getClientId(), hdr.getCxid(),
hdr.getType(), null , null );
request.hdr = hdr;
request.txn = txn;
request.zxid = hdr.getZxid();
//如果request.zxid的低32为不全为0, 则加入pendingTxns队列中
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
//调用SyncRequestProcessor处理这个request
syncProcessor.processRequest(request);
}
【All Followers, Step 8】处理器SyncRequestProcessor的功能和Leader的SyncRequestProcessor一样,将请求记录到日志中,然后将Request请求传递给下一个处理器。不过Follower的下一个处理器是SendAckRequestProcessor。该处理器会构建一个Leader.ACK的Quorum数据包,并发送给Leader。
SendAckRequestProcessor的processRequest方法如下: