5.源码分析---SOFARPC调用服务

我们这一次来接着上一篇文章《4. 源码分析---SOFARPC服务端暴露》讲一下服务暴露之后被客户端调用之后服务端是怎么返回数据的。

示例我们还是和上篇文章一样使用一样的bolt协议来讲:

public static void main(String[] args) { ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") // 设置一个协议,默认bolt .setPort(12200) // 设置一个端口,默认12200 .setDaemon(false); // 非守护线程 ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定接口 .setRef(new HelloServiceImpl()) // 指定实现 .setServer(serverConfig); // 指定服务端 providerConfig.export(); // 发布服务 }

在Bolt协议下面,当服务端被调用的时候一个服务的流程如下所示:
BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker

BoltServerProcessor#handleRequest @Override public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest request) { // RPC内置上下文 RpcInternalContext context = RpcInternalContext.getContext(); context.setProviderSide(true); String appName = request.getTargetAppName(); if (appName == null) { // 默认全局appName appName = (String) RpcRuntimeContext.get(RpcRuntimeContext.KEY_APPNAME); } // 是否链路异步化中 boolean isAsyncChain = false; try { // 这个 try-finally 为了保证Context一定被清理 processingCount.incrementAndGet(); // 统计值加1 context.setRemoteAddress(bizCtx.getRemoteHost(), bizCtx.getRemotePort()); // 远程地址 context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, asyncCtx); // 远程返回的通道 if (RpcInternalContext.isAttachmentEnable()) { InvokeContext boltInvokeCtx = bizCtx.getInvokeContext(); if (boltInvokeCtx != null) { putToContextIfNotNull(boltInvokeCtx, InvokeContext.BOLT_PROCESS_WAIT_TIME, context, RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME); // rpc线程池等待时间 Long } } if (EventBus.isEnable(ServerReceiveEvent.class)) { EventBus.post(new ServerReceiveEvent(request)); } // 开始处理 SofaResponse response = null; // 响应,用于返回 Throwable throwable = null; // 异常,用于记录 ProviderConfig providerConfig = null; String serviceName = request.getTargetServiceUniqueName(); try { // 这个try-catch 保证一定有Response invoke: { if (!boltServer.isStarted()) { // 服务端已关闭 throwable = new SofaRpcException(RpcErrorType.SERVER_CLOSED, LogCodes.getLog( LogCodes.WARN_PROVIDER_STOPPED, SystemInfo.getLocalHost() + ":" + boltServer.serverConfig.getPort())); response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage()); break invoke; } if (bizCtx.isRequestTimeout()) { // 加上丢弃超时的请求的逻辑 throwable = clientTimeoutWhenReceiveRequest(appName, serviceName, bizCtx.getRemoteAddress()); break invoke; } // 查找服务 //在server.registerProcessor方法中设置 ProviderProxyInvoker Invoker invoker = boltServer.findInvoker(serviceName); if (invoker == null) { throwable = cannotFoundService(appName, serviceName); response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage()); break invoke; } if (invoker instanceof ProviderProxyInvoker) { providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig(); // 找到服务后,打印服务的appName appName = providerConfig != null ? providerConfig.getAppName() : null; } // 查找方法 String methodName = request.getMethodName(); //在server.registerProcessor方法中设置 Method serviceMethod = ReflectCache.getOverloadMethodCache(serviceName, methodName, request.getMethodArgSigs()); if (serviceMethod == null) { throwable = cannotFoundServiceMethod(appName, methodName, serviceName); response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage()); break invoke; } else { request.setMethod(serviceMethod); } // 真正调用 response = doInvoke(serviceName, invoker, request); if (bizCtx.isRequestTimeout()) { // 加上丢弃超时的响应的逻辑 throwable = clientTimeoutWhenSendResponse(appName, serviceName, bizCtx.getRemoteAddress()); break invoke; } } } catch (Exception e) { // 服务端异常,不管是啥异常 LOGGER.errorWithApp(appName, "Server Processor Error!", e); throwable = e; response = MessageBuilder.buildSofaErrorResponse(e.getMessage()); } // Response不为空,代表需要返回给客户端 if (response != null) { RpcInvokeContext invokeContext = RpcInvokeContext.peekContext(); isAsyncChain = CommonUtils.isTrue(invokeContext != null ? (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null); // 如果是服务端异步代理模式,特殊处理,因为该模式是在业务代码自主异步返回的 if (!isAsyncChain) { // 其它正常请求 try { // 这个try-catch 保证一定要记录tracer asyncCtx.sendResponse(response); } finally { if (EventBus.isEnable(ServerSendEvent.class)) { EventBus.post(new ServerSendEvent(request, response, throwable)); } } } } } catch (Throwable e) { // 可能有返回时的异常 if (LOGGER.isErrorEnabled(appName)) { LOGGER.errorWithApp(appName, e.getMessage(), e); } } finally { processingCount.decrementAndGet(); if (!isAsyncChain) { if (EventBus.isEnable(ServerEndHandleEvent.class)) { EventBus.post(new ServerEndHandleEvent()); } } RpcInvokeContext.removeContext(); RpcInternalContext.removeAllContext(); } }

这个方法主要做了如下几件事:

设置上下文参数

从缓存中得到服务暴露时设置的invoker

为request设置method参数

调用doInvoke返回response

将response返回给客户端

BoltServerProcessor#doInvoke

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwjsp.html