看了这篇你就会手写RPC框架了 (4)

下面是他们的实现代码:

/** * 被该注解标记的服务可提供远程RPC访问的能力 * * @author 东方雨倾 * @since 1.0.0 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface Service { String value() default ""; } /** * 该注解用于注入远程服务 * * @author 东方雨倾 * @since 1.0.0 */ @Target({ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface InjectService { } 3.4.3.1 服务注册(暴露) /** * 服务注册器,定义服务注册规范 * * @author 东方雨倾 * @since 1.0.0 */ public interface ServiceRegister { void register(ServiceObject so) throws Exception; ServiceObject getServiceObject(String name) throws Exception; } /** * 默认服务注册器 * * @author 东方雨倾 * @since 1.0.0 */ public class DefaultServiceRegister implements ServiceRegister { private Map<String, ServiceObject> serviceMap = new HashMap<>(); protected String protocol; protected Integer port; @Override public void register(ServiceObject so) throws Exception { if (so == null) { throw new IllegalArgumentException("Parameter cannot be empty."); } this.serviceMap.put(so.getName(), so); } @Override public ServiceObject getServiceObject(String name) { return this.serviceMap.get(name); } } /** * Zookeeper服务注册器,提供服务注册、服务暴露的能力 * * @author 东方雨倾 * @since 1.0.0 */ public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister { /** * Zk客户端 */ private ZkClient client; public ZookeeperExportServiceRegister(String zkAddress, Integer port, String protocol) { client = new ZkClient(zkAddress); client.setZkSerializer(new ZookeeperSerializer()); this.port = port; this.protocol = protocol; } /** * 服务注册 * * @param so 服务持有者 * @throws Exception 注册异常 */ @Override public void register(ServiceObject so) throws Exception { super.register(so); Service service = new Service(); String host = InetAddress.getLocalHost().getHostAddress(); String address = host + ":" + port; service.setAddress(address); service.setName(so.getClazz().getName()); service.setProtocol(protocol); this.exportService(service); } /** * 服务暴露 * * @param serviceResource 需要暴露的服务信息 */ private void exportService(Service serviceResource) { String serviceName = serviceResource.getName(); String uri = JSON.toJSONString(serviceResource); try { uri = URLEncoder.encode(uri, UTF_8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } String servicePath = ZK_SERVICE_PATH + PATH_DELIMITER + serviceName + "/service"; if (!client.exists(servicePath)) { client.createPersistent(servicePath, true); } String uriPath = servicePath + PATH_DELIMITER + uri; if (client.exists(uriPath)) { client.delete(uriPath); } client.createEphemeral(uriPath); } }

这个过程其实没啥好说的,就是将指定ServiceObject对象序列化后保存到ZK上,供客户端发现。同时会将服务对象缓存起来,在客户端调用服务时,通过缓存的ServiceObject对象反射指定服务,调用方法。

3.4.3.2 网络服务 /** * RPC服务端抽象类 * * @author 东方雨倾 * @since 1.0.0 */ public abstract class RpcServer { /** * 服务端口 */ protected int port; /** * 服务协议 */ protected String protocol; /** * 请求处理者 */ protected RequestHandler handler; public RpcServer(int port, String protocol, RequestHandler handler) { super(); this.port = port; this.protocol = protocol; this.handler = handler; } /** * 开启服务 */ public abstract void start(); /** * 停止服务 */ public abstract void stop(); // getter setter ... } /** * Netty RPC服务端,提供Netty网络服务开启、关闭的能力 * * @author 东方雨倾 * @since 1.0.0 */ public class NettyRpcServer extends RpcServer { private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class); private Channel channel; public NettyRpcServer(int port, String protocol, RequestHandler handler) { super(port, protocol, handler); } @Override public void start() { // 配置服务器 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(new ChannelRequestHandler()); } }); // 启动服务 ChannelFuture f = b.bind(port).sync(); logger.info("Server started successfully."); channel = f.channel(); // 等待服务通道关闭 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 释放线程组资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } @Override public void stop() { this.channel.close(); } private class ChannelRequestHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { logger.info("Channel active:{}", ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("The server receives a message: {}", msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] req = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(req); byte[] res = handler.handleRequest(req); logger.info("Send response:{}", msg); ByteBuf respBuf = Unpooled.buffer(res.length); respBuf.writeBytes(res); ctx.write(respBuf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); logger.error("Exception occurred:{}", cause.getMessage()); ctx.close(); } } } /** * 请求处理者,提供解组请求、编组响应等操作 * * @author 东方雨倾 * @since 1.0.0 */ public class RequestHandler { private MessageProtocol protocol; private ServiceRegister serviceRegister; public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) { super(); this.protocol = protocol; this.serviceRegister = serviceRegister; } public byte[] handleRequest(byte[] data) throws Exception { // 1、解组消息 LeisureRequest req = this.protocol.unmarshallingRequest(data); // 2、查找服务对象 ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName()); LeisureResponse rsp = null; if (so == null) { rsp = new LeisureResponse(LeisureStatus.NOT_FOUND); } else { // 3、反射调用对应的过程方法 try { Method m = so.getClazz().getMethod(req.getMethod(), req.getParameterTypes()); Object returnValue = m.invoke(so.getObj(), req.getParameters()); rsp = new LeisureResponse(LeisureStatus.SUCCESS); rsp.setReturnValue(returnValue); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { rsp = new LeisureResponse(LeisureStatus.ERROR); rsp.setException(e); } } // 4、编组响应消息 return this.protocol.marshallingResponse(rsp); } // getter setter ... }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpygwy.html