//如果 zxid的低32位都是0, 则直接return
if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack by this method. However,
* the learner sends ack back to the leader after it gets UPTODATE
* so we just ignore the message.
*/
return ;
}
//如果没有未完成的proposal, 则直接return
if (outstandingProposals .size() == 0) {
if (LOG .isDebugEnabled()) {
LOG.debug( "outstanding is 0" );
}
return ;
}
//如果最近提交的proposal的 zxid比ack 的proposal的zxid大,说明 ack的proposal已经提交了, 则直接return
if (lastCommitted >= zxid) {
if (LOG .isDebugEnabled()) {
LOG.debug( "proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long. toHexString( lastCommitted), Long.toHexString(zxid));
}
// The proposal has already been committed
return ;
}
//根据 zxid取出proposal对象
Proposal p = outstandingProposals .get(zxid);
//如果在未完成列表outstandingProposal中没有找到 zxid对于的proposal, 则说明该 zxid对于的Proposal还没有处理。
if (p == null) {
LOG.warn( "Trying to commit future proposal: zxid 0x{} from {}",
Long. toHexString(zxid), followerAddr );
return ;
}
//将发送 ack的Follower的sid放入Proposal.ackSet集合中
p. ackSet.add(sid);
if (LOG .isDebugEnabled()) {
LOG.debug( "Count for zxid: 0x{} is {}" ,
Long. toHexString(zxid), p.ackSet.size());
}
//如果ackSet集合中已经包含了一个 Quorum
if (self .getQuorumVerifier().containsQuorum(p.ackSet)){
if (zxid != lastCommitted +1) {
LOG.warn( "Commiting zxid 0x{} from {} not first!" ,
Long. toHexString(zxid), followerAddr );
LOG.warn( "First is 0x{}" , Long.toHexString (lastCommitted + 1));
}
//从outstandingProposals中删除掉这个 zxid对于的proposal对象
outstandingProposals.remove(zxid);
//如果p.request不等于null, 则将这个proposal放入toBeApplied列表中
if (p.request != null) {
toBeApplied.add(p);
}
if (p.request == null) {
LOG.warn( "Going to commmit null request for proposal: {}", p);
}
//发送Leader.COMMIT 包给所有的Follower
commit(zxid);
//通知所有的Observer
inform(p);
//调用处理器CommitProcessor的commit方法
zk. commitProcessor.commit(p.request );
//如果有sync等着等待这个commit的 zxid,发送Leader.SYNC数据包给对应的Follower
if (pendingSyncs .containsKey(zxid)){
for (LearnerSyncRequest r: pendingSyncs .remove(zxid)) {
sendSync(r);
}
}
}
}
【All Follower, Step 10】Follower.followLeader方法会循环读取从Leader的传输过来的Quorum数据包,并调用Follower.processPacket方法。该方法会根据数据的内容来分发。当发现是Leader.COMMIT类型的Quorum数据包,则会根据Quorum数据包的内容构造一个Request对象,并调用FollowerZooKeeperServer.commit方法。该方法最终会调用处理器CommitRequestProcessor.commit方法,将Request对象放到CommitRequestProcessor.committedRequests队列中。
FollowerZooKeeperServer.commit方法如下: