这个run方法会调用到我们在multiGet中设置的run方法中
RaftRawKVStore#multiGet
public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
if (!readOnlySafe) {
this.kvStore.multiGet(keys, false, closure);
return;
}
// KV 存储实现线性一致读
// 调用 readIndex 方法,等待回调执行
this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(final Status status, final long index, final byte[] reqCtx) {
//如果状态返回成功,
if (status.isOk()) {
RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
return;
}
//readIndex 读取失败尝试应用键值读操作申请任务于 Leader 节点的状态机 KVStoreStateMachine
RaftRawKVStore.this.readIndexExecutor.execute(() -> {
if (isLeader()) {
LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
status);
// If 'read index' read fails, try to applying to the state machine at the leader node
applyOperation(KVOperation.createMultiGet(keys), closure);
} else {
LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
// Client will retry to leader node
new KVClosureAdapter(closure, null).run(status);
}
});
}
});
这个run方法会调用RaftRawKVStore的multiGet从RocksDB中直接获取数据。
总结
我们这篇文章从RheaKVStore的客户端get方法一直讲到,RheaKVStore服务端使用JRaft实现线性一致性读,并讲解了线性一致性读是怎么实现的,通过这个例子大家应该对线性一致性读有了一个相对不错的理解了。