其实这篇文章我本来想在讲完选举的时候就开始讲线性一致性读的,但是感觉直接讲没头没尾的看起来比比较困难,所以就有了RheaKV的系列,这是RheaKV,终于可以讲一下SOFAJRaft的线性一致性读是怎么做到了的。所谓线性一致性,一个简单的例子是在 T1 的时间写入一个值,那么在 T1 之后读一定能读到这个值,不可能读到 T1 之前的值。
其中部分内容参考SOFAJRaft文档:
SOFAJRaft 线性一致读实现剖析 | SOFAJRaft 实现原理
SOFAJRaft 实现原理 - SOFAJRaft-RheaKV 是如何使用 Raft 的
RheaKV的读取数据的入口是DefaultRheaKVStore的bGet。
DefaultRheaKVStore#bGet
public byte[] bGet(final String key) { return FutureHelper.get(get(key), this.futureTimeoutMillis); }bGet方法中会一直调用到DefaultRheaKVStore的一个get方法中:
DefaultRheaKVStore#get
get方法会根据传入的参数来判断是否采用批处理的方式来读取数据,readOnlySafe表示是否开启线程一致性读,由于我们调用的是get方法,所以readOnlySafe和tryBatching都会返回true。
所以这里会调用getBatchingOnlySafe的apply方法,将key和future传入。
getBatchingOnlySafe是在我们初始化DefaultRheaKVStore的时候初始化的:
DefaultRheaKVStore#init
在初始化getBatchingOnlySafe的时候传入的处理器是GetBatchingHandler。
然后我们回到getBatchingOnlySafe#apply中,看看这个方法做了什么:
public boolean apply(final byte[] message, final CompletableFuture<byte[]> future) { //GetBatchingHandler return this.ringBuffer.tryPublishEvent((event, sequence) -> { event.reset(); event.key = message; event.future = future; }); }apply方法会向Disruptor发送一个事件进行异步处理,并把我们的key封装到event的key中。getBatchingOnlySafe的处理器是GetBatchingHandler。
批量获取数据GetBatchingHandler#onEvent
public void onEvent(final KeyEvent event, final long sequence, final boolean endOfBatch) throws Exception { this.events.add(event); this.cachedBytes += event.key.length; final int size = this.events.size(); //校验一下数据量,没有达到MaxReadBytes并且不是最后一个event,那么直接返回 if (!endOfBatch && size < batchingOpts.getBatchSize() && this.cachedBytes < batchingOpts.getMaxReadBytes()) { return; } if (size == 1) { reset(); try { //如果只是一个get请求,那么不需要进行批量处理 get(event.key, this.readOnlySafe, event.future, false); } catch (final Throwable t) { exceptionally(t, event.future); } } else { //初始化一个刚刚好大小的集合 final List<byte[]> keys = Lists.newArrayListWithCapacity(size); final CompletableFuture<byte[]>[] futures = new CompletableFuture[size]; for (int i = 0; i < size; i++) { final KeyEvent e = this.events.get(i); keys.add(e.key); futures[i] = e.future; } //遍历完events数据到entries之后,重置 reset(); try { multiGet(keys, this.readOnlySafe).whenComplete((result, throwable) -> { //异步回调处理数据 if (throwable == null) { for (int i = 0; i < futures.length; i++) { final ByteArray realKey = ByteArray.wrap(keys.get(i)); futures[i].complete(result.get(realKey)); } return; } exceptionally(throwable, futures); }); } catch (final Throwable t) { exceptionally(t, futures); } } } }onEvent方法首先会校验一下当前的event数量有没有达到阈值以及当前的event是不是Disruptor中最后一个event;然后会根据不同的events集合中的数量来走不同的实现,这里做了一个优化,如果是只有一条数据那么不会走批处理;最后将所有的key放入到keys集合中并调用multiGet进行批处理。
multiGet方法会调用internalMultiGet返回一个Future,从而实现异步的返回结果。
DefaultRheaKVStore#internalMultiGet