2. Sentinel源码分析—Sentinel是如何进行流量统计的? (3)

SlotChainProvider#newSlotChain

public static ProcessorSlotChain newSlotChain() { if (slotChainBuilder != null) { return slotChainBuilder.build(); } //根据spi初始化slotChainBuilder,默认是DefaultSlotChainBuilder resolveSlotChainBuilder(); if (slotChainBuilder == null) { RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } return slotChainBuilder.build(); }

默认调用DefaultSlotChainBuilder的build方法进行初始化

DefaultSlotChainBuilder#build

public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); //创建Node节点 chain.addLast(new NodeSelectorSlot()); //用于构建资源的 ClusterNode chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); //用于统计实时的调用数据 chain.addLast(new StatisticSlot()); //用于对入口的资源进行调配 chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); //用于限流 chain.addLast(new FlowSlot()); //用于降级 chain.addLast(new DegradeSlot()); return chain; }

DefaultProcessorSlotChain里面会创建一个头节点,然后把其他节点通过addLast串成一个链表:

2. Sentinel源码分析—Sentinel是如何进行流量统计的?

最后我们再回到CtSph的entryWithPriority方法中,往下走调用chain.entry方法触发调用链。

Context

在往下看Slot插槽之前,我们先总结一下Context是怎样的一个结构:

2. Sentinel源码分析—Sentinel是如何进行流量统计的?

在Sentinel中,所有的统计操作都是基于context来进行的。context会通过ContextUtil的trueEnter方法进行创建,会根据context的不同的name来组装不同的Node来实现数据的统计。

在经过NodeSelectorSlot的时候会根据传入的不同的context的name字段来获取不同的DefaultNode对象,然后设置到context的curEntry实例的curNode属性中。

NodeSelectorSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { DefaultNode node = map.get(context.getName()); if (node == null) { synchronized (this) { node = map.get(context.getName()); if (node == null) { node = new DefaultNode(resourceWrapper, null); HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; // Build invocation tree ((DefaultNode) context.getLastNode()).addChild(node); } } } //设置到context的curEntry实例的curNode属性中 context.setCurNode(node); fireEntry(context, resourceWrapper, node, count, prioritized, args); }

然后再经过ClusterBuilderSlot槽位在初始化的时候会初始化一个静态的全局clusterNodeMap用来记录所有的ClusterNode,维度是ResourceWrapper。每次调用entry方法的时候会先去全局的clusterNodeMap,找不到就会创建一个新的clusterNode,放入到node的ClusterNode属性中,用来统计ResourceWrapper维度下面的所有数据。

//此变量是静态的,所以只会初始化一次,存有所有的ResourceWrapper维度下的数据 private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>(); public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { if (clusterNode == null) { synchronized (lock) { if (clusterNode == null) { // Create the cluster node. clusterNode = new ClusterNode(); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16)); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; } } } node.setClusterNode(clusterNode); if (!"".equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } fireEntry(context, resourceWrapper, node, count, prioritized, args); } StatisticSlot public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { //先直接往下调用,如果没有报错则进行统计 // Do some checking. fireEntry(context, resourceWrapper, node, count, prioritized, args); //当前线程数加1 // Request passed, add thread count and pass count. node.increaseThreadNum(); //通过的请求加上count node.addPassRequest(count); ... } catch (PriorityWaitException ex) { node.increaseThreadNum(); ... } catch (BlockException e) { //设置错误信息 // Blocked, set block exception to current entry. context.getCurEntry().setError(e); ... //设置被阻塞的次数 // Add block count. node.increaseBlockQps(count); ... throw e; } catch (Throwable e) { // Unexpected error, set error to current entry. context.getCurEntry().setError(e); //设置异常的次数 // This should not happen. node.increaseExceptionQps(count); ... throw e; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwfdp.html