RoundRobinWithRequestSeparatedPositionLoadBalancer
//一定必须是实现ReactorServiceInstanceLoadBalancer //而不是ReactorLoadBalancer<ServiceInstance> //因为注册的时候是ReactorServiceInstanceLoadBalancer @Log4j2 public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer { private final ServiceInstanceListSupplier serviceInstanceListSupplier; //每次请求算上重试不会超过1分钟 //对于超过1分钟的,这种请求肯定比较重,不应该重试 private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES) //随机初始值,防止每次都是从第一个开始调用 .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000))); private final String serviceId; private final Tracer tracer; public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) { this.serviceInstanceListSupplier = serviceInstanceListSupplier; this.serviceId = serviceId; this.tracer = tracer; } @Override public Mono<Response<ServiceInstance>> choose(Request request) { return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances)); } private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } return getInstanceResponseByRoundRobin(serviceInstances); } private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } //为了解决原始算法不同调用并发可能导致一个请求重试相同的实例 Span currentSpan = tracer.currentSpan(); if (currentSpan == null) { currentSpan = tracer.newTrace(); } long l = currentSpan.context().traceId(); AtomicInteger seed = positionCache.get(l); int s = seed.getAndIncrement(); int pos = s % serviceInstances.size(); log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size()); return new DefaultResponse(serviceInstances.stream() //实例返回列表顺序可能不同,为了保持一致,先排序再取 .sorted(Comparator.comparing(ServiceInstance::getInstanceId)) .collect(Collectors.toList()).get(pos)); } } 将上述两个元素加入我们自定义的 LoadBalancerClient 并启用在上一节,我们提到了可以通过 @LoadBalancerClients 注解配置默认的负载均衡器配置,我们这里就是通过这种方式进行配置。首先在 spring.factories 中添加自动配置类:
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration然后编写这个自动配置类,其实很简单,就是添加一个 @LoadBalancerClients 注解,设置默认配置类:
LoadBalancerAutoConfiguration
@Configuration(proxyBeanMethods = false) @LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class) public class LoadBalancerAutoConfiguration { }编写这个默认配置类,将上面我们实现的两个类,组装进去:
DefaultLoadBalancerConfiguration
@Configuration(proxyBeanMethods = false) public class DefaultLoadBalancerConfiguration { @Bean public ServiceInstanceListSupplier serviceInstanceListSupplier( DiscoveryClient discoveryClient, Environment env, ConfigurableApplicationContext context, LoadBalancerZoneConfig zoneConfig ) { ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); return //开启服务实例缓存 new CachingServiceInstanceListSupplier( //只能返回同一个 zone 的服务实例 new SameZoneOnlyServiceInstanceListSupplier( //启用通过 discoveryClient 的服务发现 new DiscoveryClientServiceInstanceListSupplier( discoveryClient, env ), zoneConfig ) , cacheManagerProvider.getIfAvailable() ); } @Bean public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer( Environment environment, ServiceInstanceListSupplier serviceInstanceListSupplier, Tracer tracer ) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RoundRobinWithRequestSeparatedPositionLoadBalancer( serviceInstanceListSupplier, name, tracer ); } }