由于服务端发现服务时可能有多个,所以需要用到负载均衡算法来实现
ILoadBalance public interface ILoadBalance<T> { T select(List<T> servers); } AbstractLoadBalance public abstract class AbstractLoadBanalce implements ILoadBalance<ServiceInstance<ServiceInfo>> { @Override public ServiceInstance<ServiceInfo> select(List<ServiceInstance<ServiceInfo>> servers){ if(servers==null||servers.size()==0){ return null; } if(servers.size()==1){ return servers.get(0); } return doSelect(servers); } protected abstract ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers); } RandomLoadBalance public class RandomLoadBalance extends AbstractLoadBanalce { @Override protected ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers) { int length=servers.size(); Random random=new Random(); return servers.get(random.nextInt(length)); } } RegistryType public enum RegistryType { ZOOKEEPER((byte)0), EUREKA((byte)1); private byte code; RegistryType(byte code) { this.code=code; } public byte code(){ return this.code; } public static RegistryType findByCode(byte code) { for (RegistryType rt : RegistryType.values()) { if (rt.code() == code) { return rt; } } return null; } } RegistryFactory public class RegistryFactory { public static IRegistryService createRegistryService(String address,RegistryType registryType){ IRegistryService registryService=null; try { switch (registryType) { case ZOOKEEPER: registryService = new ZookeeperRegistryService(address); break; case EUREKA: //TODO break; default: registryService = new ZookeeperRegistryService(address); break; } }catch (Exception e){ e.printStackTrace(); } return registryService; } } 修改服务端增加服务注册修改netty-rpc-protocol模块,加入注册中心的支持
SpringRpcProviderBean按照下面case标注部分,表示要修改的内容
@Slf4j public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor { private final int serverPort; private final String serverAddress; private final IRegistryService registryService; //修改部分,增加注册中心实现 public SpringRpcProviderBean(int serverPort,IRegistryService registryService) throws UnknownHostException { this.serverPort = serverPort; InetAddress address=InetAddress.getLocalHost(); this.serverAddress=address.getHostAddress(); this.registryService=registryService; //修改部分,增加注册中心实现 } @Override public void afterPropertiesSet() throws Exception { log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort); new Thread(()->{ try { new NettyServer(this.serverAddress,this.serverPort).startNettyServer(); } catch (Exception e) { log.error("start Netty Server Occur Exception,",e); e.printStackTrace(); } }).start(); } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ //针对存在该注解的服务进行发布 Method[] methods=bean.getClass().getDeclaredMethods(); for(Method method: methods){ String serviceName=bean.getClass().getInterfaces()[0].getName(); String key=serviceName+"."+method.getName(); BeanMethod beanMethod=new BeanMethod(); beanMethod.setBean(bean); beanMethod.setMethod(method); Mediator.beanMethodMap.put(key,beanMethod); try { //修改部分,增加注册中心实现 ServiceInfo serviceInfo = new ServiceInfo(); serviceInfo.setServiceAddress(this.serverAddress); serviceInfo.setServicePort(this.serverPort); serviceInfo.setServiceName(serviceName); registryService.register(serviceInfo);//修改部分,增加注册中心实现 }catch (Exception e){ log.error("register service {} faild",serviceName,e); } } } return bean; } } RpcServerProperties