6. SOFAJRaft源码分析— 透过RheaKV看线性一致性读

其实这篇文章我本来想在讲完选举的时候就开始讲线性致性读的,但是感觉直接讲没头没尾的看起来比比较困难,所以就有了RheaKV的系列,这是RheaKV,终于可以讲下SOFAJRaft的线性一致性读是怎么做到了的。所谓线性一致性,一个简单的例子是在 T1 的时间写入一个值,那么在 T1 之后读一定能读到这个值,不可能读到 T1 之前的值。

其中部分内容参考SOFAJRaft文档:
SOFAJRaft 线性一致读实现剖析 | SOFAJRaft 实现原理
SOFAJRaft 实现原理 - SOFAJRaft-RheaKV 是如何使用 Raft 的

RheaKV读取数据

RheaKV的读取数据的入口是DefaultRheaKVStore的bGet。

DefaultRheaKVStore#bGet

public byte[] bGet(final String key) { return FutureHelper.get(get(key), this.futureTimeoutMillis); }

bGet方法中会一直调用到DefaultRheaKVStore的一个get方法中:
DefaultRheaKVStore#get

private CompletableFuture<byte[]> get(final byte[] key, final boolean readOnlySafe, final CompletableFuture<byte[]> future, final boolean tryBatching) { //校验started状态 checkState(); Requires.requireNonNull(key, "key"); if (tryBatching) { final GetBatching getBatching = readOnlySafe ? this.getBatchingOnlySafe : this.getBatching; if (getBatching != null && getBatching.apply(key, future)) { return future; } } internalGet(key, readOnlySafe, future, this.failoverRetries, null, this.onlyLeaderRead); return future; }

get方法会根据传入的参数来判断是否采用批处理的方式来读取数据,readOnlySafe表示是否开启线程一致性读,由于我们调用的是get方法,所以readOnlySafe和tryBatching都会返回true。
所以这里会调用getBatchingOnlySafe的apply方法,将key和future传入。
getBatchingOnlySafe是在我们初始化DefaultRheaKVStore的时候初始化的:
DefaultRheaKVStore#init

..... this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe", new GetBatchingHandler("get_only_safe", true)); .....

在初始化getBatchingOnlySafe的时候传入的处理器是GetBatchingHandler。

然后我们回到getBatchingOnlySafe#apply中,看看这个方法做了什么:

public boolean apply(final byte[] message, final CompletableFuture<byte[]> future) { //GetBatchingHandler return this.ringBuffer.tryPublishEvent((event, sequence) -> { event.reset(); event.key = message; event.future = future; }); }

apply方法会向Disruptor发送一个事件进行异步处理,并把我们的key封装到event的key中。getBatchingOnlySafe的处理器是GetBatchingHandler。

批量获取数据

GetBatchingHandler#onEvent

public void onEvent(final KeyEvent event, final long sequence, final boolean endOfBatch) throws Exception { this.events.add(event); this.cachedBytes += event.key.length; final int size = this.events.size(); //校验一下数据量,没有达到MaxReadBytes并且不是最后一个event,那么直接返回 if (!endOfBatch && size < batchingOpts.getBatchSize() && this.cachedBytes < batchingOpts.getMaxReadBytes()) { return; } if (size == 1) { reset(); try { //如果只是一个get请求,那么不需要进行批量处理 get(event.key, this.readOnlySafe, event.future, false); } catch (final Throwable t) { exceptionally(t, event.future); } } else { //初始化一个刚刚好大小的集合 final List<byte[]> keys = Lists.newArrayListWithCapacity(size); final CompletableFuture<byte[]>[] futures = new CompletableFuture[size]; for (int i = 0; i < size; i++) { final KeyEvent e = this.events.get(i); keys.add(e.key); futures[i] = e.future; } //遍历完events数据到entries之后,重置 reset(); try { multiGet(keys, this.readOnlySafe).whenComplete((result, throwable) -> { //异步回调处理数据 if (throwable == null) { for (int i = 0; i < futures.length; i++) { final ByteArray realKey = ByteArray.wrap(keys.get(i)); futures[i].complete(result.get(realKey)); } return; } exceptionally(throwable, futures); }); } catch (final Throwable t) { exceptionally(t, futures); } } } }

onEvent方法首先会校验一下当前的event数量有没有达到阈值以及当前的event是不是Disruptor中最后一个event;然后会根据不同的events集合中的数量来走不同的实现,这里做了一个优化,如果是只有一条数据那么不会走批处理;最后将所有的key放入到keys集合中并调用multiGet进行批处理。

multiGet方法会调用internalMultiGet返回一个Future,从而实现异步的返回结果。
DefaultRheaKVStore#internalMultiGet

private FutureGroup<Map<ByteArray, byte[]>> internalMultiGet(final List<byte[]> keys, final boolean readOnlySafe, final int retriesLeft, final Throwable lastCause) { //因为不同的key是存放在不同的region中的,所以一个region会对应多个key,封装到map中 final Map<Region, List<byte[]>> regionMap = this.pdClient .findRegionsByKeys(keys, ApiExceptionHelper.isInvalidEpoch(lastCause)); //返回值 final List<CompletableFuture<Map<ByteArray, byte[]>>> futures = Lists.newArrayListWithCapacity(regionMap.size()); //lastCause传入为null final Errors lastError = lastCause == null ? null : Errors.forException(lastCause); for (final Map.Entry<Region, List<byte[]>> entry : regionMap.entrySet()) { final Region region = entry.getKey(); final List<byte[]> subKeys = entry.getValue(); //重试次数减1,设置一个重试函数 final RetryCallable<Map<ByteArray, byte[]>> retryCallable = retryCause -> internalMultiGet(subKeys, readOnlySafe, retriesLeft - 1, retryCause); final MapFailoverFuture<ByteArray, byte[]> future = new MapFailoverFuture<>(retriesLeft, retryCallable); //发送MultiGetRequest请求,获取数据 internalRegionMultiGet(region, subKeys, readOnlySafe, future, retriesLeft, lastError, this.onlyLeaderRead); futures.add(future); } return new FutureGroup<>(futures); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgjfs.html