【Soul网关探秘】http数据同步-Web端处理变更通知

不同数据变更的通知机制应当是一致的,故本篇以 selector 配置变更通知为切入点进行深入。

通知处理入口

上回我们说到 HttpSyncDataService 的 doLongPolling,在其内部发起通知订阅并接收响应通知:

private void doLongPolling(final String server) { ... String listenerUrl = server + "/configs/listener"; ... try { // 发起监听请求 String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody(); log.debug("listener result: [{}]", json); groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data"); } catch (RestClientException e) { ... } // 处理变更通知 if (groupJson != null) { // fetch group configuration async. ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class); if (ArrayUtils.isNotEmpty(changedGroups)) { log.info("Group config changed: {}", Arrays.toString(changedGroups)); // 获取组配置 this.doFetchGroupConfig(server, changedGroups); } } }

在收到变更通知时,若存在配置组变更,则按变更组获取相应配置。

获取配置

获取组配置处理如下:

private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) { ... String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&"); ... try { json = this.httpClient.getForObject(url, String.class); } catch (RestClientException e) { ... } // update local cache boolean updated = this.updateCacheWithJson(json); ... }

内部发起配置获取请求并更新本地缓存。

更新配置组缓存

由 HttpSyncDataService 实现本地缓存更新:

private boolean updateCacheWithJson(final String json) { JsonObject jsonObject = GSON.fromJson(json, JsonObject.class); JsonObject data = jsonObject.getAsJsonObject("data"); // if the config cache will be updated? return factory.executor(data); }

转成 Json 对象后交由 DataRefreshFactory 进行处理。

DataRefreshFactory 处理如下:

public boolean executor(final JsonObject data) { final boolean[] success = {false}; ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data)); return success[0]; }

调用相应数据刷新类刷新数据。

统一由 AbstractDataRefresh 的 refresh 进行处理:

public Boolean refresh(final JsonObject data) { boolean updated = false; JsonObject jsonObject = convert(data); if (null != jsonObject) { ConfigData<T> result = fromJson(jsonObject); if (this.updateCacheIfNeed(result)) { updated = true; refresh(result.getData()); } } return updated; }

先更新本地缓存,再调用子类实现的 refresh。

此处的更新本地缓存处理,由子类 SelectorDataRefresh 的 updateCacheIfNeed 实现:

protected boolean updateCacheIfNeed(final ConfigData<SelectorData> result) { return updateCacheIfNeed(result, ConfigGroupEnum.SELECTOR); }

向父类 AbstractDataRefresh 的 updateCacheIfNeed 指定更新 selector 配置组。

父类 AbstractDataRefresh 的 updateCacheIfNeed 处理:

protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) { // 首次初始化缓存 if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) { return true; } ResultHolder holder = new ResultHolder(false); GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> { // 必须比较最后更新时间 if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) { ... holder.result = true; return newVal; } ... return oldVal; }); return holder.result; }

通过比较新老缓存的 MD5 值来判定是否发生变更,存在变更则更新本地缓存(注意还有最后更新时间判定)。

处理刷新事件

SelectorDataRefresh 的 refresh 实现:

protected void refresh(final List<SelectorData> data) { if (CollectionUtils.isEmpty(data)) { log.info("clear all selector cache, old cache"); data.forEach(pluginDataSubscriber::unSelectorSubscribe); pluginDataSubscriber.refreshSelectorDataAll(); } else { // update cache for UpstreamCacheManager pluginDataSubscriber.refreshSelectorDataAll(); data.forEach(pluginDataSubscriber::onSelectorSubscribe); } }

若最新数据为空,则循环取消订阅并刷新所有选择器数据,实际是清空选择器缓存。

若最新数据不为空,则刷新所有选择器数据并循环响应选择器订阅事件处理,实际是更新上游服务缓存。

取消订阅

CommonPluginDataSubscriber 实现订阅取消:

public void unSelectorSubscribe(final SelectorData selectorData) { subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE); }

subscribeDataHandler 对 selectorData 的 delete 处理:

private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { if (data instanceof PluginData) { ... } else if (data instanceof SelectorData) { SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { ... } else if (dataType == DataEventTypeEnum.DELETE) { BaseDataCache.getInstance().removeSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); } } else if (data instanceof RuleData) { ... } }); }

从 BaseDataCache 删除目标选择器数据,并移除选择器。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxwgw.html