RaftRawKVStore#multiGet
public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) { if (!readOnlySafe) { this.kvStore.multiGet(keys, false, closure); return; } // KV 存储实现线性一致读 // 调用 readIndex 方法,等待回调执行 this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { @Override public void run(final Status status, final long index, final byte[] reqCtx) { //如果状态返回成功, if (status.isOk()) { RaftRawKVStore.this.kvStore.multiGet(keys, true, closure); return; } //readIndex 读取失败尝试应用键值读操作申请任务于 Leader 节点的状态机 KVStoreStateMachine RaftRawKVStore.this.readIndexExecutor.execute(() -> { if (isLeader()) { LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.", status); // If 'read index' read fails, try to applying to the state machine at the leader node applyOperation(KVOperation.createMultiGet(keys), closure); } else { LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status); // Client will retry to leader node new KVClosureAdapter(closure, null).run(status); } }); } }); }multiGet调用node的readIndex方法进行一致性读操作,并设置回调,如果返回成功那么就直接调用RocksRawKVStore读取数据,如果返回不是成功那么申请任务于 Leader 节点的状态机 KVStoreStateMachine。
线性一致性读readIndex所谓线性一致读,一个简单的例子是在 t1 的时刻我们写入了一个值,那么在 t1 之后,我们一定能读到这个值,不可能读到 t1 之前的旧值(想想 Java 中的 volatile 关键字,即线性一致读就是在分布式系统中实现 Java volatile 语义)。简而言之是需要在分布式环境中实现 Java volatile 语义效果,即当 Client 向集群发起写操作的请求并且获得成功响应之后,该写操作的结果要对所有后来的读请求可见。和 volatile 的区别在于 volatile 是实现线程之间的可见,而 SOFAJRaft 需要实现 Server 之间的可见。
SOFAJRaft提供的线性一致读是基于 Raft 协议的 ReadIndex 实现用 ;Node#readIndex(byte [] requestContext, ReadIndexClosure done) 发起线性一致读请求,当安全读取时传入的 Closure 将被调用,正常情况从状态机中读取数据返回给客户端。
Node#readIndex
public void readIndex(final byte[] requestContext, final ReadIndexClosure done) { if (this.shutdownLatch != null) { //异步执行回调 Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down.")); throw new IllegalStateException("Node is shutting down"); } Requires.requireNonNull(done, "Null closure"); //EMPTY_BYTES this.readOnlyService.addRequest(requestContext, done); }readIndex会调用ReadOnlyServiceImpl#addRequest将requestContext和回调方法done传入,requestContext传入的是BytesUtil.EMPTY_BYTES
接着往下看
ReadOnlyServiceImpl#addRequest
public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) { if (this.shutdownLatch != null) { Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped")); throw new IllegalStateException("Service already shutdown."); } try { EventTranslator<ReadIndexEvent> translator = (event, sequence) -> { event.done = closure; //EMPTY_BYTES event.requestContext = new Bytes(reqCtx); event.startTime = Utils.monotonicMs(); }; int retryTimes = 0; while (true) { //ReadIndexEventHandler if (this.readIndexQueue.tryPublishEvent(translator)) { break; } else { retryTimes++; if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) { Utils.runClosureInThread(closure, new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests.")); this.nodeMetrics.recordTimes("read-index-overload-times", 1); LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId()); return; } ThreadHelper.onSpinWait(); } } } catch (final Exception e) { Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down.")); } }addRequest方法里会将传入的reqCtx和closure封装成一个时间,传入到readIndexQueue队列中,事件发布成功后会交由ReadIndexEventHandler处理器处理,发布失败会进行重试,最多重试3次。
ReadIndexEventHandler
private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> { // task list for batch private final List<ReadIndexEvent> events = new ArrayList<>( ReadOnlyServiceImpl.this.raftOptions.getApplyBatch()); @Override public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch) throws Exception { if (newEvent.shutdownLatch != null) { executeReadIndexEvents(this.events); this.events.clear(); newEvent.shutdownLatch.countDown(); return; } this.events.add(newEvent); //批量执行 if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) { executeReadIndexEvents(this.events); this.events.clear(); } } }