基于Netty4手把手实现一个带注册中心和注解的Dubbo框架 (5)

修改RpcServerProperties,增加注册中心的配置

@Data @ConfigurationProperties(prefix = "gp.rpc") public class RpcServerProperties { private int servicePort; private byte registerType; private String registryAddress; } RpcProviderAutoConfiguration

增加注册中心的注入。

@Configuration @EnableConfigurationProperties(RpcServerProperties.class) public class RpcProviderAutoConfiguration { @Bean public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException { //添加注册中心 IRegistryService registryService=RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegisterType())); return new SpringRpcProviderBean(rpcServerProperties.getServicePort(),registryService); } } application.properties

修改netty-rpc-provider中的application.properties。

gp.rpc.servicePort=20880 gp.rpc.registerType=0 gp.rpc.registryAddress=192.168.221.128:2181 修改客户端,增加服务发现

客户端需要修改的地方较多,下面这些修改的代码,都是netty-rpc-protocol模块中的类。

RpcClientProperties

增加注册中心类型和注册中心地址的选项

@Data public class RpcClientProperties { private String serviceAddress="192.168.1.102"; private int servicePort=20880; private byte registryType; private String registryAddress; } 修改NettyClient

原本是静态地址,现在修改成了从注册中心获取地址

@Slf4j public class NettyClient { private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup(); /* private String serviceAddress; private int servicePort;*/ public NettyClient(){ log.info("begin init NettyClient"); bootstrap=new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new RpcClientInitializer()); /* this.serviceAddress=serviceAddress; this.servicePort=servicePort;*/ } public void sendRequest(RpcProtocol<RpcRequest> protocol, IRegistryService registryService) throws Exception { ServiceInfo serviceInfo=registryService.discovery(protocol.getContent().getClassName()); ChannelFuture future=bootstrap.connect(serviceInfo.getServiceAddress(),serviceInfo.getServicePort()).sync(); future.addListener(listener->{ if(future.isSuccess()){ log.info("connect rpc server {} success.",serviceInfo.getServiceAddress()); }else{ log.error("connect rpc server {} failed .",serviceInfo.getServiceAddress()); future.cause().printStackTrace(); eventLoopGroup.shutdownGracefully(); } }); log.info("begin transfer data"); future.channel().writeAndFlush(protocol); } } 修改RpcInvokerProxy

将静态ip和地址,修改成IRegistryService

@Slf4j public class RpcInvokerProxy implements InvocationHandler { /* private String serviceAddress; private int servicePort;*/ IRegistryService registryService; public RpcInvokerProxy(IRegistryService registryService) { /* this.serviceAddress = serviceAddress; this.servicePort = servicePort;*/ this.registryService=registryService; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.info("begin invoke target server"); //组装参数 RpcProtocol<RpcRequest> protocol=new RpcProtocol<>(); long requestId= RequestHolder.REQUEST_ID.incrementAndGet(); Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0); protocol.setHeader(header); RpcRequest request=new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParams(args); protocol.setContent(request); //发送请求 NettyClient nettyClient=new NettyClient(); //构建异步数据处理 RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop())); RequestHolder.REQUEST_MAP.put(requestId,future); nettyClient.sendRequest(protocol,this.registryService); return future.getPromise().get().getData(); } } SpringRpcReferenceBean

修改引用bean,增加注册中心配置

public class SpringRpcReferenceBean implements FactoryBean<Object> { private Class<?> interfaceClass; private Object object; /* private String serviceAddress; private int servicePort;*/ //修改增加注册中心 private byte registryType; private String registryAddress; @Override public Object getObject() throws Exception { return object; } public void init(){ //修改增加注册中心 IRegistryService registryService= RegistryFactory.createRegistryService(this.registryAddress, RegistryType.findByCode(this.registryType)); this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(), new Class<?>[]{this.interfaceClass}, new RpcInvokerProxy(registryService)); } @Override public Class<?> getObjectType() { return this.interfaceClass; } public void setInterfaceClass(Class<?> interfaceClass) { this.interfaceClass = interfaceClass; } /* public void setServiceAddress(String serviceAddress) { this.serviceAddress = serviceAddress; } public void setServicePort(int servicePort) { this.servicePort = servicePort; }*/ public void setRegistryType(byte registryType) { this.registryType = registryType; } public void setRegistryAddress(String registryAddress) { this.registryAddress = registryAddress; } } SpringRpcReferencePostProcessor @Slf4j public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor { private ApplicationContext context; private ClassLoader classLoader; private RpcClientProperties clientProperties; public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) { this.clientProperties = clientProperties; } //保存发布的引用bean信息 private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>(); @Override public void setBeanClassLoader(ClassLoader classLoader) { this.classLoader=classLoader; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context=applicationContext; } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){ //遍历bean定义,然后获取到加载的bean,遍历这些bean中的字段,是否携带GpRemoteReference注解 //如果有,则需要构建一个动态代理实现 BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname); String beanClassName=beanDefinition.getBeanClassName(); if(beanClassName!=null){ Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader); ReflectionUtils.doWithFields(clazz,this::parseRpcReference); } } //将@GpRemoteReference注解的bean,构建一个动态代理对象 BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory; this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{ if(context.containsBean(beanName)){ log.warn("SpringContext already register bean {}",beanName); return; } registry.registerBeanDefinition(beanName,beanDefinition); log.info("registered RpcReferenceBean {} success.",beanName); }); } private void parseRpcReference(Field field){ GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class); if(gpRemoteReference!=null) { BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class); builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME); builder.addPropertyValue("interfaceClass",field.getType()); /*builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress()); builder.addPropertyValue("servicePort",clientProperties.getServicePort());*/ builder.addPropertyValue("registryType",clientProperties.getRegistryType()); builder.addPropertyValue("registryAddress",clientProperties.getRegistryAddress()); BeanDefinition beanDefinition=builder.getBeanDefinition(); rpcRefBeanDefinitions.put(field.getName(),beanDefinition); } } } RpcRefernceAutoConfiguration @Configuration public class RpcRefernceAutoConfiguration implements EnvironmentAware{ @Bean public SpringRpcReferencePostProcessor postProcessor(){ String address=environment.getProperty("gp.serviceAddress"); int port=Integer.parseInt(environment.getProperty("gp.servicePort")); RpcClientProperties rc=new RpcClientProperties(); rc.setServiceAddress(address); rc.setServicePort(port); rc.setRegistryType(Byte.parseByte(environment.getProperty("gp.registryType"))); rc.setRegistryAddress(environment.getProperty("gp.registryAddress")); return new SpringRpcReferencePostProcessor(rc); } private Environment environment; @Override public void setEnvironment(Environment environment) { this.environment=environment; } } application.properties

修改netty-rpc-consumer模块中的配置

gp.serviceAddress=192.168.1.102 gp.servicePort=20880 gp.registryType=0 gp.registryAddress=192.168.221.128:2181 负载均衡的测试

增加一个服务端的启动类,并且修改端口。然后客户端不需要重启的情况下刷新浏览器,即可看到负载均衡的效果。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzpjzj.html