6.Sentinel源码分析—Sentinel是如何动态加载配置限流的? (3)

这个方法实际上就是添加了一个监听器,然后将FlowRuleManager的currentProperty替换成flowRuleDataSource创建的property。然后flowRuleDataSource里面的定时线程会每隔3秒钟调用一下这个LISTENER的configUpdate方法进行刷新规则,这样就实现了动态更新规则。

Push-based:ZooKeeper

我们还是先给出一个例子:

public static void main(String[] args) { final String remoteAddress = "127.0.0.1:2181"; final String path = "/Sentinel-Demo/SYSTEM-CODE-DEMO-FLOW"; ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<>(remoteAddress, path, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); }

在这里我定义了/Sentinel-Demo/SYSTEM-CODE-DEMO-FLOW这个path,如果这个path内的内容发生了变化,那么就会刷新规则。

我们先看一下ZookeeperDataSource的继承关系:

6.Sentinel源码分析—Sentinel是如何动态加载配置限流的?

ZookeeperDataSource

public ZookeeperDataSource(final String serverAddr, final String path, Converter<String, T> parser) { super(parser); if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(path)) { throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], path=[%s]", serverAddr, path)); } this.path = path; init(serverAddr, null); }

AbstractDataSource

public AbstractDataSource(Converter<S, T> parser) { if (parser == null) { throw new IllegalArgumentException("parser can't be null"); } this.parser = parser; this.property = new DynamicSentinelProperty<T>(); }

ZookeeperDataSource首先会调用父类进行参数的设置,在校验完之后调用init方法进行初始化。

ZookeeperDataSource#init

private void init(final String serverAddr, final List<AuthInfo> authInfos) { initZookeeperListener(serverAddr, authInfos); loadInitialConfig(); }

ZookeeperDataSource#initZookeeperListener

private void initZookeeperListener(final String serverAddr, final List<AuthInfo> authInfos) { try { //设置监听 this.listener = new NodeCacheListener() { @Override public void nodeChanged() { try { T newValue = loadConfig(); RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s", serverAddr, path, newValue)); // Update the new value to the property. getProperty().updateValue(newValue); } catch (Exception ex) { RecordLog.warn("[ZookeeperDataSource] loadConfig exception", ex); } } }; String zkKey = getZkKey(serverAddr, authInfos); if (zkClientMap.containsKey(zkKey)) { this.zkClient = zkClientMap.get(zkKey); } else { //如果key不存在,那么就加锁设值 synchronized (lock) { if (!zkClientMap.containsKey(zkKey)) { CuratorFramework zc = null; //根据不同的条件获取client if (authInfos == null || authInfos.size() == 0) { zc = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)); } else { zc = CuratorFrameworkFactory.builder(). connectString(serverAddr). retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)). authorization(authInfos). build(); } this.zkClient = zc; this.zkClient.start(); Map<String, CuratorFramework> newZkClientMap = new HashMap<>(zkClientMap.size()); newZkClientMap.putAll(zkClientMap); newZkClientMap.put(zkKey, zc); zkClientMap = newZkClientMap; } else { this.zkClient = zkClientMap.get(zkKey); } } } //为节点添加watcher //监听数据节点的变更,会触发事件 this.nodeCache = new NodeCache(this.zkClient, this.path); this.nodeCache.getListenable().addListener(this.listener, this.pool); this.nodeCache.start(); } catch (Exception e) { RecordLog.warn("[ZookeeperDataSource] Error occurred when initializing Zookeeper data source", e); e.printStackTrace(); } }

这个方法主要就是用来创建client和设值监听,都是zk的常规操作,不熟悉的,可以去看看Curator是怎么使用的。

private void loadInitialConfig() { try { //调用父类的loadConfig方法 T newValue = loadConfig(); if (newValue == null) { RecordLog.warn("[ZookeeperDataSource] WARN: initial config is null, you may have to check your data source"); } getProperty().updateValue(newValue); } catch (Exception ex) { RecordLog.warn("[ZookeeperDataSource] Error when loading initial config", ex); } }

设值完zk的client和监听后会调用一次updateValue,首次加载节点的信息。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwdpz.html