上一讲 我们通过单元测试 来梳理了EurekaClient是如何注册到server端,以及server端接收到请求是如何处理的,这里最重要的关注点是注册表的一个数据结构:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
本讲目录回头看了下之前的博客,没有一个总目录说明,每篇都是直接源码分析了。从此篇文章开始都会加上目录,以及文章最后会加上总结及读此篇源码的感受。希望这个博客系列的文章会越来越好。
目录如下:
client端第一次注册全量抓取注册表的逻辑
server端返回注册表信息集合的多级缓存机制
server端注册表多级缓存过期机制:主动+定时+被动
client端增量抓取注册表逻辑
技术亮点:
注册表抓取的多级缓存机制
增量抓取返回的全量数据hashCode,和本地数据hashCode对比,保证数据一致性
这里再啰嗦一点,之前一直吐槽EurekaClient注册的逻辑,今天看了EurekaClient注册表抓取的逻辑后,不由的感叹设计的精妙之处,这里说的精妙是指EurekaServer端对于注册表读取逻辑的设计,缓存逻辑以及增量获取时Hash一致性的判断,真的很妙,感觉又学到了不少东西。读完这段代码 一大早就很兴奋,哈哈哈,一起看看吧。
说明原创不易,如若转载 请标明来源:一枝花算不算浪漫
EurekaClient全量抓取注册表逻辑一直在想着怎么才能把自己看完代码后的理解用文字表达出来,这里采用一种新模式吧,先画图,然后源码,然后解读。
图片看起来很简单,Client发送Http请求给Server端,Server端返回全量的注册表信息给Client端。接下来就是跟进代码一步步分析,这里先有个大概印象
源码解析Client端发送获取全量注册表请求
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { // 省略很多无关代码 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } } private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // 删减掉一些代码 // registry was fetched successfully, so return true return true; } private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }