6. SOFAJRaft源码分析— 透过RheaKV看线性一致性读 (2)

internalMultiGet里会根据key去组装region,不同的key会对应不同的region,数据时存在region中的,所以要从不同的region中获取数据,region和key是一对多的关系所以这里会封装成一个map。然后会遍历regionMap,每个region所对应的数据作为一个批次调用到internalRegionMultiGet方法中,根据不同的情况获取数据。

DefaultRheaKVStore#internalRegionMultiGet

private void internalRegionMultiGet(final Region region, final List<byte[]> subKeys, final boolean readOnlySafe, final CompletableFuture<Map<ByteArray, byte[]>> future, final int retriesLeft, final Errors lastCause, final boolean requireLeader) { //因为当前的是client,所以这里会是null final RegionEngine regionEngine = getRegionEngine(region.getId(), requireLeader); // require leader on retry //设置重试函数 final RetryRunner retryRunner = retryCause -> internalRegionMultiGet(region, subKeys, readOnlySafe, future, retriesLeft - 1, retryCause, true); final FailoverClosure<Map<ByteArray, byte[]>> closure = new FailoverClosureImpl<>(future, false, retriesLeft, retryRunner); if (regionEngine != null) { if (ensureOnValidEpoch(region, regionEngine, closure)) { //如果不是null,那么会获取rawKVStore,并从中获取数据 final RawKVStore rawKVStore = getRawKVStore(regionEngine); if (this.kvDispatcher == null) { rawKVStore.multiGet(subKeys, readOnlySafe, closure); } else { //如果是kvDispatcher不为空,那么放入到kvDispatcher中异步执行 this.kvDispatcher.execute(() -> rawKVStore.multiGet(subKeys, readOnlySafe, closure)); } } } else { final MultiGetRequest request = new MultiGetRequest(); request.setKeys(subKeys); request.setReadOnlySafe(readOnlySafe); request.setRegionId(region.getId()); request.setRegionEpoch(region.getRegionEpoch()); //调用rpc请求 this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause, requireLeader); } }

因为我们这里是client端调用internalRegionMultiGet方法的,所以是没有设置regionEngine的,那么会直接向server的当前region所对应的leader节点发送一个MultiGetRequest请求。

因为上面的这些方法基本上和put是一致的,我们已经在5. SOFAJRaft源码分析— RheaKV中如何存放数据?讲过了,所以这里不重复的讲了。

server端处理MultiGetRequest请求

MultiGetRequest请求会被KVCommandProcessor所处理,KVCommandProcessor里会根据请求的magic方法返回值来判断是用什么方式来进行处理。我们这里会调用到DefaultRegionKVService的handleMultiGetRequest方法中处理请求。

public void handleMultiGetRequest(final MultiGetRequest request, final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure) { final MultiGetResponse response = new MultiGetResponse(); response.setRegionId(getRegionId()); response.setRegionEpoch(getRegionEpoch()); try { KVParameterRequires.requireSameEpoch(request, getRegionEpoch()); final List<byte[]> keys = KVParameterRequires.requireNonEmpty(request.getKeys(), "multiGet.keys"); //调用MetricsRawKVStore的multiGet方法 this.rawKVStore.multiGet(keys, request.isReadOnlySafe(), new BaseKVStoreClosure() { @SuppressWarnings("unchecked") @Override public void run(final Status status) { if (status.isOk()) { response.setValue((Map<ByteArray, byte[]>) getData()); } else { setFailure(request, response, status, getError()); } closure.sendResponse(response); } }); } catch (final Throwable t) { LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t)); response.setError(Errors.forException(t)); closure.sendResponse(response); } }

handleMultiGetRequest方法会调用MetricsRawKVStore的multiGet方法来批量获取数据。

MetricsRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) { //实例化MetricsKVClosureAdapter对象 final KVStoreClosure c = metricsAdapter(closure, MULTI_GET, keys.size(), 0); //调用RaftRawKVStore的multiGet方法 this.rawKVStore.multiGet(keys, readOnlySafe, c); }

multiGet方法会传入一个MetricsKVClosureAdapter实例,通过这个实例实现异步回调response。然后调用RaftRawKVStore的multiGet方法。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgjfs.html