ReadIndexEventHandler是ReadOnlyServiceImpl里面的内部类,里面有一个全局的events集合用来做事件的批处理,如果当前的event已经达到了32个或是整个Disruptor队列里最后一个那么会调用ReadOnlyServiceImpl的executeReadIndexEvents方法进行事件的批处理。
ReadOnlyServiceImpl#executeReadIndexEvents
private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
if (events.isEmpty()) {
return;
}
//初始化ReadIndexRequest
final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
.setGroupId(this.node.getGroupId()) //
.setServerId(this.node.getServerId().toString());
final List<ReadIndexState> states = new ArrayList<>(events.size());
for (final ReadIndexEvent event : events) {
rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
}
final ReadIndexRequest request = rb.build();
this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}
executeReadIndexEvents封装好ReadIndexRequest请求和将ReadIndexState集合封装到ReadIndexResponseClosure中,为后续的操作做装备
NodeImpl#handleReadIndexRequest
public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
final long startMs = Utils.monotonicMs();
this.readLock.lock();
try {
switch (this.state) {
case STATE_LEADER:
readLeader(request, ReadIndexResponse.newBuilder(), done);
break;
case STATE_FOLLOWER:
readFollower(request, done);
break;
case STATE_TRANSFERRING:
done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
break;
default:
done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
break;
}
} finally {
this.readLock.unlock();
this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount());
}
}
因为线性一致读在任何集群内的节点发起,并不需要强制要求放到 Leader 节点上,允许在 Follower 节点执行,因此大大降低 Leader 的读取压力。
当在Follower节点执行一致性读的时候实际上Follower 节点调用 RpcService#readIndex(leaderId.getEndpoint(), newRequest, -1, closure) 方法向 Leader 发送 ReadIndex 请求,交由Leader节点实现一致性读。所以我这里主要介绍Leader的一致性读。
继续往下走调用NodeImpl的readLeader方法
NodeImpl#readLeader
private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
final RpcResponseClosure<ReadIndexResponse> closure) {
//1. 获取集群节点中多数选票数是多少
final int quorum = getQuorum();
if (quorum <= 1) {
// Only one peer, fast path.
//如果集群中只有一个节点,那么直接调用回调函数,返回成功
respBuilder.setSuccess(true) //
.setIndex(this.ballotBox.getLastCommittedIndex());
closure.setResponse(respBuilder.build());
closure.run(Status.OK());
return;
}
final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
//2. 任期必须相等
//日志管理器 LogManager 基于投票箱 BallotBox 的 lastCommittedIndex 获取任期检查是否等于当前任期
// 如果不等于当前任期表示此 Leader 节点未在其任期内提交任何日志,需要拒绝只读请求;
if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
// Reject read only request when this leader has not committed any log entry at its term
closure
.run(new Status(
RaftError.EAGAIN,
"ReadIndex request rejected because leader has not committed any log entry at its term, " +
"logIndex=%d, currTerm=%d.",
lastCommittedIndex, this.currTerm));
return;
}
respBuilder.setIndex(lastCommittedIndex);
if (request.getPeerId() != null) {
// request from follower, check if the follower is in current conf.
final PeerId peer = new PeerId();
peer.parse(request.getServerId());
//3. 来自 Follower 的请求需要检查 Follower 是否在当前配置
if (!this.conf.contains(peer)) {
closure
.run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: {}.", peer,
this.conf));
return;
}
}
ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
//4. 如果使用的是ReadOnlyLeaseBased,确认leader是否是在在租约有效时间内
if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
// If leader lease timeout, we must change option to ReadOnlySafe
readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
}
switch (readOnlyOpt) {
//5
case ReadOnlySafe:
final List<PeerId> peers = this.conf.getConf().getPeers();
Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
//设置心跳的响应回调函数
final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
respBuilder, quorum, peers.size());
// Send heartbeat requests to followers
//向 Followers 节点发起一轮 Heartbeat,如果半数以上节点返回对应的
// Heartbeat Response,那么 Leader就能够确定现在自己仍然是 Leader
for (final PeerId peer : peers) {
if (peer.equals(this.serverId)) {
continue;
}
this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
}
break;
//6. 因为在租约期内不会发生选举,确保 Leader 不会变化
//所以直接返回回调结果
case ReadOnlyLeaseBased:
// Responses to followers and local node.
respBuilder.setSuccess(true);
closure.setResponse(respBuilder.build());
closure.run(Status.OK());
break;
}
}
获取集群节点中多数选票数是多少,即集群节点的1/2+1,如果当前的集群里只有一个节点,那么直接返回成功,并调用回调方法
校验 Raft 集群节点数量以及 lastCommittedIndex 所属任期符合预期,那么响应构造器设置其索引为投票箱 BallotBox 的 lastCommittedIndex
来自 Follower 的请求需要检查 Follower 是否在当前配置,如果不在当前配置中直接调用回调方法设置异常