曹工说mini-dubbo(1)--为了实践动态代理,我写了个简单的rpc框架写在前面的话 (2)

可以看到,我们这边是比dubbo少了几层,首先proxy,目前直接用了jdk动态代理,没有其他技术,所以就没有抽出一层;然后monitor层,现在肯定是没有的,这部分其实才是一个框架的重头戏,但是我也不会前端,所以这块估计暂时没有;接下来是protocol层,我暂时不太清楚dubbo的设计,所以就没弄这层。

知道了分层结构后,我们可以回到第一点,即动态代理那里,我们的动态代理,只依赖下层的接口。目前,各层之间的接口,放在mini-dubbo-common模块中,定义如下:

注册中心层,负责接收上层传来的调用参数等上下文,并返回结果

/** * 注册中心层的rpc调用者 * 1:接收上层传下来的业务参数,并返回结果 * * 本层:会根据不同实现,去相应的注册中心,获取匹配的服务提供者列表,传输给下一层 */ public interface RegistryLayerRpcInvoker { Object invoke(RpcContext rpcContext); }

集群层,接收上层注册中心层传来的服务提供者列表和rpc调用上下文,并返回最终结果

public interface ClusterLayerRpcInvoker { /** * 由注册中心层提供对应service的服务提供者列表,本方法可以根据负载均衡策略,进行筛选 * @param providerList * @param rpcContext * @return */ Object invoke(List<ProviderHostAndPort> providerList, RpcContext rpcContext); }

exchange层,上层集群层,会替我们选好某一台具体的服务提供者,然后让我们去调用,本层完成同步转异步

public interface ExchangeLayerRpcInvoker { /** * * @param providerHostAndPort 要调用的服务提供者的地址 * @param rpcContext rpc上下文,包含了要调用的参数等 * @return rpc调用的结果 */ Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext); }

传输层,本层目前有两个简单实现,netty和mina。

/** * * 本层为传输层,上层为exchange层。 * 上层exchange,目前有一个默认实现,主要是完成同步转异步的操作。 * 上层将具体的传输工作交给底层的传输层,比如netty和mina,然后在一个future上等待传输层完成工作 * * 本层会完成实际的发送工作和接收返回响应的工作 */ public interface TransportLayerRpcInvoker { /** * * @param providerHostAndPort 要调用的服务提供者的地址 * @param rpcContext rpc上下文,包含了要调用的参数等 * @return rpc调用的结果 */ Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext); }

其中,我们的最上边的动态代理层,只依赖于下层,其中,示例代码如下:

@Override public Object invoke(Object proxy, Method method, Object[] args) { // 1.从spring容器中,获取下层的实现bean;如果有多个,则根据spi文件中指定的为准 RegistryLayerRpcInvoker registryLayerRpcInvoker = SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class); RpcContext rpcContext = new RpcContext(); rpcContext.setProxy(proxy); rpcContext.setMethod(method); rpcContext.setArgs(args); rpcContext.setServiceName(method.getDeclaringClass().getName()); // 2.调用下层 Object o = registryLayerRpcInvoker.invoke(rpcContext); return o; }

这里1处,可以看到,我们通过SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class)去获取具体的下层实现,这是我们自定义的一个工具类,其内部实现一会再说。

2处调用下层实现,获取结果。

registry,注册中心层的实现

@Service public class RedisRegistryRpcInvoker implements RegistryLayerRpcInvoker { @Autowired private RedisRegistry redisRegistry; @Override public Object invoke(RpcContext rpcContext) { //1.获取集群层实现 ClusterLayerRpcInvoker clusterLayerRpcInvoker = SpiServiceLoader.loadService(ClusterLayerRpcInvoker.class); //2.从redis中,根据服务名,获取服务提供者列表 List<ProviderHostAndPort> list = redisRegistry.getServiceProviderList(rpcContext.getServiceName()); if (CollectionUtils.isEmpty(list)) { throw new RuntimeException(); } //2.调用集群层实现,获取结果 Object o = clusterLayerRpcInvoker.invoke(list, rpcContext); return o; } }

集群层实现,本层我也不算懂,模仿dubbo实现了一下。

主要实现了以下两种:

Failover,出现失败,立即重试其他服务器。可以设置重试次数。

Failfast,请求失败以后,返回异常结果,不进行重试。

以failover为例:

@Slf4j @Service public class FailoverClusterLayerRpcInvoker implements ClusterLayerRpcInvoker { @Autowired private LoadBalancePolicy loadBalancePolicy; @Override public Object invoke(List<ProviderHostAndPort> providerList, RpcContext rpcContext) { ExchangeLayerRpcInvoker exchangeLayerRpcInvoker = SpiServiceLoader.loadService(ExchangeLayerRpcInvoker.class); int retryTimes = 3; for (int i = 0; i < retryTimes; i++) { // 1.根据负载均衡策略,选择1台服务提供者 ProviderHostAndPort providerHostAndPort = loadBalancePolicy.selectOne(providerList); try { // 调用下层,获取结果 Object o = exchangeLayerRpcInvoker.invoke(providerHostAndPort, rpcContext); return o; } catch (Exception e) { log.error("fail to invoke {},exception:{},will try another", providerHostAndPort,e); // 2.如果调用失败,进入下一次循环 continue; } } throw new RuntimeException("fail times extend"); } }

其中,一共会尝试3次,每次的逻辑:根据负载均衡策略,选择1台去调用;如果有问题,则换一台。

调用下层时,获取了下层的接口:ExchangeLayerRpcInvoker

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwjxw.html