需要在RpcConstant常量中增加一个INIT_METHOD_NAME属性
public class RpcConstant { //header部分的总字节数 public final static int HEAD_TOTAL_LEN=16; //魔数 public final static short MAGIC=0xca; public static final String INIT_METHOD_NAME = "init"; } RpcClientProperties @Data public class RpcClientProperties { private String serviceAddress="192.168.1.102"; private int servicePort=20880; } RpcRefernceAutoConfiguration @Configuration public class RpcRefernceAutoConfiguration implements EnvironmentAware{ @Bean public SpringRpcReferencePostProcessor postProcessor(){ String address=environment.getProperty("gp.serviceAddress"); int port=Integer.parseInt(environment.getProperty("gp.servicePort")); RpcClientProperties rc=new RpcClientProperties(); rc.setServiceAddress(address); rc.setServicePort(port); return new SpringRpcReferencePostProcessor(rc); } private Environment environment; @Override public void setEnvironment(Environment environment) { this.environment=environment; } } netty-rpc-consumer修改netty-rpc-consumer模块
把该模块变成一个spring boot项目
增加web依赖
添加测试类
图7-3 netty-rpc-consumer模块 引入jar包依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> HelloController @RestController public class HelloController { @GpRemoteReference private IUserService userService; @GetMapping("/test") public String test(){ return userService.saveUser("Mic"); } } NettyConsumerMain @ComponentScan(basePackages = {"com.example.spring.annotation","com.example.controller","com.example.spring.reference"}) @SpringBootApplication public class NettyConsumerMain { public static void main(String[] args) { SpringApplication.run(NettyConsumerMain.class, args); } } application.properties gp.serviceAddress=192.168.1.102 servicePort.servicePort=20880 访问测试启动Netty-Rpc-Server
启动Netty-Rpc-Consumer
如果启动过程没有任何问题,则可以访问HelloController来测试远程服务的访问。
引入注册中心创建一个netty-rpc-registry模块,代码结构如图7-4所示。
引入相关依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>4.2.0</version> </dependency> IRegistryService public interface IRegistryService { /** * 注册服务 * @param serviceInfo * @throws Exception */ void register(ServiceInfo serviceInfo) throws Exception; /** * 取消注册 * @param serviceInfo * @throws Exception */ void unRegister(ServiceInfo serviceInfo) throws Exception; /** * 动态发现服务 * @param serviceName * @return * @throws Exception */ ServiceInfo discovery(String serviceName) throws Exception; } ServiceInfo @Data public class ServiceInfo { private String serviceName; private String serviceAddress; private int servicePort; } ZookeeperRegistryService @Slf4j public class ZookeeperRegistryService implements IRegistryService { private static final String REGISTRY_PATH="/registry"; //Curator中提供的服务注册与发现的组件封装,它对此抽象出了ServiceInstance、 // ServiceProvider、ServiceDiscovery三个接口,通过它我们可以很轻易的实现Service Discovery private final ServiceDiscovery<ServiceInfo> serviceDiscovery; private ILoadBalance<ServiceInstance<ServiceInfo>> loadBalance; public ZookeeperRegistryService(String registryAddress) throws Exception { CuratorFramework client= CuratorFrameworkFactory .newClient(registryAddress,new ExponentialBackoffRetry(1000,3)); JsonInstanceSerializer<ServiceInfo> serializer=new JsonInstanceSerializer<>(ServiceInfo.class); this.serviceDiscovery= ServiceDiscoveryBuilder.builder(ServiceInfo.class) .client(client) .serializer(serializer) .basePath(REGISTRY_PATH) .build(); this.serviceDiscovery.start(); loadBalance=new RandomLoadBalance(); } @Override public void register(ServiceInfo serviceInfo) throws Exception { log.info("开始注册服务,{}",serviceInfo); ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance .<ServiceInfo>builder().name(serviceInfo.getServiceName()) .address(serviceInfo.getServiceAddress()) .port(serviceInfo.getServicePort()) .payload(serviceInfo) .build(); serviceDiscovery.registerService(serviceInstance); } @Override public void unRegister(ServiceInfo serviceInfo) throws Exception { ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance.<ServiceInfo>builder() .name(serviceInfo.getServiceName()) .address(serviceInfo.getServiceAddress()) .port(serviceInfo.getServicePort()) .payload(serviceInfo) .build(); serviceDiscovery.unregisterService(serviceInstance); } @Override public ServiceInfo discovery(String serviceName) throws Exception { Collection<ServiceInstance<ServiceInfo>> serviceInstances= serviceDiscovery .queryForInstances(serviceName); //通过负载均衡返回某个具体实例 ServiceInstance<ServiceInfo> serviceInstance=loadBalance.select((List<ServiceInstance<ServiceInfo>>)serviceInstances); if(serviceInstance!=null){ return serviceInstance.getPayload(); } return null; } } 引入负载均衡算法