这里是起了一个调度任务,会去定时比较一级和二级缓存是否一致,如果不一致 就会用二级缓存覆盖一级缓存。这就回答了上面的第一个问题,两级缓存一致性的问题,默认30s执行一次。所以这里仍会有问题,可能缓存在30s内会存在不一致的情况,这里用的是最终一致的思想。
紧接着 读写缓存获取到数据后再去回写只读缓存,这是上面ResponseCacheImpl.java 的逻辑,到了这里 全量抓取注册表的代码都已经看完了,这里主要的亮点是使用了两级缓存策略来返回对应的数据。
接着整理下过期的几个机制,也是回应上面抛出的第二个问题。
用一张图作为总结:
主动过期
readWriteCacheMap,读写缓存
有新的服务实例发生注册、下线、故障的时候,就会去刷新readWriteCacheMap(在Client注册的时候,AbstractInstanceRegistry中register方法最后会有一个invalidateCache()方法)
比如说现在有一个服务A,ServiceA,有一个新的服务实例,Instance010来注册了,注册完了之后,其实必须是得刷新这个缓存的,然后就会调用ResponseCache.invalidate(),将之前缓存好的ALL_APPS这个key对应的缓存,给他过期掉
将readWriteCacheMap中的ALL_APPS缓存key,对应的缓存给过期掉
定时过期
readWriteCacheMap在构建的时候,指定了一个自动过期的时间,默认值就是180秒,所以你往readWriteCacheMap中放入一个数据过后,自动会等180秒过后,就将这个数据给他过期了
被动过期
readOnlyCacheMap怎么过期呢?
默认是每隔30秒,执行一个定时调度的线程任务,TimerTask,有一个逻辑,会每隔30秒,对readOnlyCacheMap和readWriteCacheMap中的数据进行一个比对,如果两块数据是不一致的,那么就将readWriteCacheMap中的数据放到readOnlyCacheMap中来。
比如说readWriteCacheMap中,ALL_APPS这个key对应的缓存没了,那么最多30秒过后,就会同步到readOnelyCacheMap中去。
client端增量抓取注册表逻辑上面抓取全量注册表的代码已经说了,这里来讲一下增量抓取,入口还是在DiscoverClient.java
中,当初始化完DiscoverClient.java 后会执行一个初始化定时任务的方法initScheduledTasks(), 其中这个里面就会每隔30s 增量抓取一次注册表信息。
这里就不跟着这里的逻辑一步步看了,看过上面的代码后 应该会对这里比较清晰了,这里我们直接看Server端代码了。
还记的我们上面插过的眼,获取全量用的是ALL_APPS 增量用的是ALL_APPS_DELTA, 所以我们这里只看增量的逻辑就行了。
else if (ALL_APPS_DELTA.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); } else { tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); } }上面只是截取了部分代码,这里直接看主要的逻辑registry.getApplicationDeltasFromMultipleRegions即可,这个和全量的方法名只有一个Deltas的区别。
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) { if (null == remoteRegions) { remoteRegions = allKnownRemoteRegions; // null means all remote regions. } boolean includeRemoteRegion = remoteRegions.length != 0; if (includeRemoteRegion) { GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment(); } else { GET_ALL_CACHE_MISS_DELTA.increment(); } Applications apps = new Applications(); apps.setVersion(responseCache.getVersionDeltaWithRegions().get()); Map<String, Application> applicationInstancesMap = new HashMap<String, Application>(); try { write.lock(); Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator(); logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size()); while (iter.hasNext()) { Lease<InstanceInfo> lease = iter.next().getLeaseInfo(); InstanceInfo instanceInfo = lease.getHolder(); logger.debug("The instance id {} is found with status {} and actiontype {}", instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()); Application app = applicationInstancesMap.get(instanceInfo.getAppName()); if (app == null) { app = new Application(instanceInfo.getAppName()); applicationInstancesMap.put(instanceInfo.getAppName(), app); apps.addApplication(app); } app.addInstance(new InstanceInfo(decorateInstanceInfo(lease))); } if (includeRemoteRegion) { for (String remoteRegion : remoteRegions) { RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion); if (null != remoteRegistry) { Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas(); if (null != remoteAppsDelta) { for (Application application : remoteAppsDelta.getRegisteredApplications()) { if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) { Application appInstanceTillNow = apps.getRegisteredApplications(application.getName()); if (appInstanceTillNow == null) { appInstanceTillNow = new Application(application.getName()); apps.addApplication(appInstanceTillNow); } for (InstanceInfo instanceInfo : application.getInstances()) { appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo)); } } } } } } } Applications allApps = getApplicationsFromMultipleRegions(remoteRegions); apps.setAppsHashCode(allApps.getReconcileHashCode()); return apps; } finally { write.unlock(); } }这里代码还是比较多的,我们只需要抓住重点即可:
从recentlyChangedQueue中获取注册信息,从名字可以看出来 这是最近改变的client注册信息的队列
使用writeLock,因为这里是获取增量注册信息,是从队列中获取,如果不加写锁,那么获取的时候又有新数据加入队列中,新数据会获取不到的
基于上面第一点,我们来看看这个队列怎么做的:
数据结构:ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue
AbstractInstanceRegistry.java初始化的时候会启动一个定时任务,默认30s中执行一次。如果注册时间小于当前时间的180s,就会放到这个队列中