曹工说mini-dubbo(1)--为了实践动态代理,我写了个简单的rpc框架写在前面的话 (3)

exchange层,这层完成同步转异步的操作,目前只有一个实现:

@Service public class Sync2AsyncExchangeImpl implements ExchangeLayerRpcInvoker { public static ConcurrentHashMap<String, CompletableFuture<Object>> requestId2futureMap = new ConcurrentHashMap<>(); @Override public Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext) { String requestId = UUID.randomUUID().toString(); rpcContext.setRequestId(requestId); rpcContext.setRequestId2futureMap(requestId2futureMap); CompletableFuture<Object> completableFuture = new CompletableFuture<>(); requestId2futureMap.put(requestId, completableFuture); /** * 交给具体的底层去解决 */ TransportLayerRpcInvoker transportLayerRpcInvoker = SpiServiceLoader.loadService(TransportLayerRpcInvoker .class); transportLayerRpcInvoker.invoke(providerHostAndPort, rpcContext); Object s = null; try { s = completableFuture.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return s; } }

这层大家可以简单理解为:主线程调用传输层之前,生成一个id和一个completablefuture,放到一个全局map,然后将id传给下层,然后在completablefuture上阻塞;下层拿到id后,在消息里传输;服务端再将id传输回来,然后客户端拿着id找到completablefuture,并唤醒主线程。

信息传输层,以netty为例,具体的netty相关的知识,大家就得自己先学习一下:

简单步骤如下:

//1.初始化客户端连接 public void initChannel() { Bootstrap b = configBootStrap(); ChannelFuture future = null; try { future = b.connect(providerHostAndPort.getHost(), providerHostAndPort.getPort()).sync(); if (future.isSuccess()) { channel = future.channel(); return; } } catch (InterruptedException e) { ... } throw new RuntimeException(); } private Bootstrap configBootStrap() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("lengthFieldPrepender", new LengthFieldPrepender(2)); p.addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder( 65536, 0, 2, 0, 2)); p.addLast("decoder", new StringDecoder()); p.addLast("encoder", new StringEncoder()); p.addLast(new ClientHandler()); }//拦截器设置 }); return b; }

使用连接的channle,发送数据:

public void sendMessage(String messageContent) { synchronized (lockObj) { if (channel == null) { initChannel(); } } ChannelFuture channelFuture = channel.writeAndFlush(messageContent); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { System.out.println("发送请求消息成功"); } }); }

netty接收到服务端相应后,根据requestId来获取future,唤醒上层线程

@Slf4j public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext cx) { log.info("channelActive,local address:{},remote address:{}", cx.channel().localAddress(),cx.channel().remoteAddress()); } /** * 读取信息 * * @param ctx 渠道连接对象 * @param msg 信息 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ResponseVO responseVO = JSONObject.parseObject((String) msg, ResponseVO.class); String requestId = responseVO.getRequestId(); //1.获取future CompletableFuture<Object> completableFuture = Netty4ClientRpcInvoker.requestId2futureMap .get(requestId); //2.将结果塞进future,在此future上阻塞的线程被唤醒 completableFuture.complete(responseVO.getContent()); log.info("client channelRead,thread:{}", Thread.currentThread()); log.info("客户端端读写远程地址是-----------" + ctx.channel().remoteAddress() + "信息是:" + msg.toString()); } }

如何根据spi进行切换

之前我们提到了可以根据spi,随意切换实现,比如我们想使用mina来传输的话:

曹工说mini-dubbo(1)--为了实践动态代理,我写了个简单的rpc框架写在前面的话

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwjxw.html