6. SOFAJRaft源码分析— 透过RheaKV看线性一致性读 (5)

获取 ReadIndex 请求级别 ReadOnlyOption 配置,ReadOnlyOption 参数默认值为 ReadOnlySafe。如果设置的是ReadOnlyLeaseBased,那么会调用isLeaderLeaseValid检查leader是否是在在租约有效时间内

配置为ReadOnlySafe 调用 Replicator#sendHeartbeat(rid, closure) 方法向 Followers 节点发送 Heartbeat 心跳请求,发送心跳成功执行 ReadIndexHeartbeatResponseClosure 心跳响应回调;ReadIndex 心跳响应回调检查是否超过半数节点包括 Leader 节点自身投票赞成,半数以上节点返回客户端Heartbeat 请求成功响应,即 applyIndex 超过 ReadIndex 说明已经同步到 ReadIndex 对应的 Log 能够提供 Linearizable Read

配置为ReadOnlyLeaseBased,因为Leader 租约有效期间认为当前 Leader 是 Raft Group 内的唯一有效 Leader,所以忽略 ReadIndex 发送 Heartbeat 确认身份步骤,直接返回 Follower 节点和本地节点 Read 请求成功响应。Leader 节点继续等待状态机执行,直到 applyIndex 超过 ReadIndex 安全提供 Linearizable Read

无论是ReadOnlySafe还是ReadOnlyLeaseBased,最后发送成功响应都会调用ReadIndexResponseClosure的run方法。

ReadIndexResponseClosure#run

public void run(final Status status) { //fail //传入的状态不是ok,响应失败 if (!status.isOk()) { notifyFail(status); return; } final ReadIndexResponse readIndexResponse = getResponse(); //Fail //response没有响应成功,响应失败 if (!readIndexResponse.getSuccess()) { notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down.")); return; } // Success //一致性读成功 final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request, readIndexResponse.getIndex()); for (final ReadIndexState state : this.states) { // Records current commit log index. //设置当前提交的index state.setIndex(readIndexResponse.getIndex()); } boolean doUnlock = true; ReadOnlyServiceImpl.this.lock.lock(); try { //校验applyIndex 是否超过 ReadIndex if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) { // Already applied, notify readIndex request. ReadOnlyServiceImpl.this.lock.unlock(); doUnlock = false; //已经同步到 ReadIndex 对应的 Log 能够提供 Linearizable Read notifySuccess(readIndexStatus); } else { // Not applied, add it to pending-notify cache. ReadOnlyServiceImpl.this.pendingNotifyStatus .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) // .add(readIndexStatus); } } finally { if (doUnlock) { ReadOnlyServiceImpl.this.lock.unlock(); } } }

Run方法首先会校验一下是否需要响应失败,如果响应成功,那么会将所有封装的ReadIndexState更新一下index,然后校验一下applyIndex 是否超过 ReadIndex,超过了ReadIndex代表所有已经复制到多数派上的 Log(可视为写操作)被视为安全的 Log,该 Log 所体现的数据就能对客户端 Client 可见。

ReadOnlyServiceImpl#notifySuccess

private void notifySuccess(final ReadIndexStatus status) { final long nowMs = Utils.monotonicMs(); final List<ReadIndexState> states = status.getStates(); final int taskCount = states.size(); for (int i = 0; i < taskCount; i++) { final ReadIndexState task = states.get(i); final ReadIndexClosure done = task.getDone(); // stack copy if (done != null) { this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs()); done.setResult(task.getIndex(), task.getRequestContext().get()); done.run(Status.OK()); } } }

如果是响应成功,那么会调用notifySuccess方法,会将status里封装的ReadIndexState集合遍历一遍,调用当中的run方法。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgjfs.html