看了这篇你就会手写RPC框架了 (2)

服务发现者使用Zookeeper来实现,通过ZkClient我们很容易发现已经注册在ZK上的服务。当然我们也可以使用其他组件作为注册中心,例如Redis。

3.3.2.2 网络客户端 /** * 网络请求客户端,定义网络请求规范 * * @author 东方雨倾 * @since 1.0.0 */ public interface NetClient { byte[] sendRequest(byte[] data, Service service) throws InterruptedException; } /** * Netty网络请求客户端,定义通过Netty实现网络请求的细则。 * * @author 东方雨倾 * @since 1.0.0 */ public class NettyNetClient implements NetClient { private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class); /** * 发送请求 * * @param data 请求数据 * @param service 服务信息 * @return 响应数据 * @throws InterruptedException 异常 */ @Override public byte[] sendRequest(byte[] data, Service service) throws InterruptedException { String[] addInfoArray = service.getAddress().split(":"); String serverAddress = addInfoArray[0]; String serverPort = addInfoArray[1]; SendHandler sendHandler = new SendHandler(data); byte[] respData; // 配置客户端 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(sendHandler); } }); // 启动客户端连接 b.connect(serverAddress, Integer.parseInt(serverPort)).sync(); respData = (byte[]) sendHandler.rspData(); logger.info("SendRequest get reply: {}", respData); } finally { // 释放线程组资源 group.shutdownGracefully(); } return respData; } } /** * 发送处理类,定义Netty入站处理细则 * * @author 东方雨倾 * @since 1.0.0 */ public class SendHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(SendHandler.class); private CountDownLatch cdl; private Object readMsg = null; private byte[] data; public SendHandler(byte[] data) { cdl = new CountDownLatch(1); this.data = data; } /** * 当连接服务端成功后,发送请求数据 * * @param ctx 通道上下文 */ @Override public void channelActive(ChannelHandlerContext ctx) { logger.info("Successful connection to server:{}", ctx); ByteBuf reqBuf = Unpooled.buffer(data.length); reqBuf.writeBytes(data); logger.info("Client sends message:{}", reqBuf); ctx.writeAndFlush(reqBuf); } /** * 读取数据,数据读取完毕释放CD锁 * * @param ctx 上下文 * @param msg ByteBuf */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { logger.info("Client reads message: {}", msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] resp = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(resp); readMsg = resp; cdl.countDown(); } /** * 等待读取数据完成 * * @return 响应数据 * @throws InterruptedException 异常 */ public Object rspData() throws InterruptedException { cdl.await(); return readMsg; } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); logger.error("Exception occurred:{}", cause.getMessage()); ctx.close(); } }

在这里我们使用Netty来实现网络请求客户端,当然也可以使用Mina。网络请求客户端能连接远程服务端,并将编组好的请求数据发送给服务端,待服务端处理好后,又将服务端的响应数据返回给客户端。

3.3.2.3 服务代理 /** * 客户端代理工厂:用于创建远程服务代理类 * 封装编组请求、请求发送、编组响应等操作。 * * @author 东方雨倾 * @since 1.0.0 */ public class ClientProxyFactory { private ServiceDiscoverer serviceDiscoverer; private Map<String, MessageProtocol> supportMessageProtocols; private NetClient netClient; private Map<Class<?>, Object> objectCache = new HashMap<>(); /** * 通过Java动态代理获取服务代理类 * * @param clazz 被代理类Class * @param <T> 泛型 * @return 服务代理类 */ @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz) { return (T) this.objectCache.computeIfAbsent(clazz, cls -> newProxyInstance(cls.getClassLoader(), new Class<?>[]{cls}, new ClientInvocationHandler(cls))); } // getter setter ... /** * 客户端服务代理类invoke函数细节实现 */ private class ClientInvocationHandler implements InvocationHandler { private Class<?> clazz; private Random random = new Random(); public ClientInvocationHandler(Class<?> clazz) { super(); this.clazz = clazz; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Exception { if (method.getName().equals("toString")) { return proxy.getClass().toString(); } if (method.getName().equals("hashCode")) { return 0; } // 1、获得服务信息 String serviceName = this.clazz.getName(); List<Service> services = serviceDiscoverer.getServices(serviceName); if (services == null || services.isEmpty()) { throw new LeisureException("No provider available!"); } // 随机选择一个服务提供者(软负载均衡) Service service = services.get(random.nextInt(services.size())); // 2、构造request对象 LeisureRequest req = new LeisureRequest(); req.setServiceName(service.getName()); req.setMethod(method.getName()); req.setParameterTypes(method.getParameterTypes()); req.setParameters(args); // 3、协议层编组 // 获得该方法对应的协议 MessageProtocol protocol = supportMessageProtocols.get(service.getProtocol()); // 编组请求 byte[] data = protocol.marshallingRequest(req); // 4、调用网络层发送请求 byte[] repData = netClient.sendRequest(data, service); // 5解组响应消息 LeisureResponse rsp = protocol.unmarshallingResponse(repData); // 6、结果处理 if (rsp.getException() != null) { throw rsp.getException(); } return rsp.getReturnValue(); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpygwy.html