2. Sentinel源码分析—Sentinel是如何进行流量统计的? (4)

这段代码中,我把不相关的代码都省略了,不影响我们的主流程。
在entry方法里面,首先是往下继续调用,根据其他的节点的情况来进行统计,比如抛出异常,那么就统计ExceptionQps,被阻塞那么就统计BlockQps,直接通过,那么就统计PassRequest。

我们先看一下线程数是如何统计的:node.increaseThreadNum()

DefaultNode#increaseThreadNum
我们先看一下DefaultNode的继承关系:

2. Sentinel源码分析—Sentinel是如何进行流量统计的?

public void increaseThreadNum() { super.increaseThreadNum(); this.clusterNode.increaseThreadNum(); }

所以super.increaseThreadNum是调用到了父类的increaseThreadNum方法。

this.clusterNode.increaseThreadNum()这句代码和super.increaseThreadNum是一样的使用方式,所以看看StatisticNode的increaseThreadNum方法就好了

StatisticNode#increaseThreadNum

private LongAdder curThreadNum = new LongAdder(); public void decreaseThreadNum() { curThreadNum.increment(); }

这个方法很简单,每次都直接使用LongAdder的api加1就好了,最后会在退出的时候减1,使用LongAdder也保证了原子性。

如果请求通过的时候会继续往下调用node.addPassRequest:

DefaultNode#addPassRequest

public void addPassRequest(int count) { super.addPassRequest(count); this.clusterNode.addPassRequest(count); }

这句代码也是调用了StatisticNode的addPassRequest方法进行统计的。

StatisticNode#addPassRequest

public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); }

这段代码里面有两个调用,一个是按分钟统计的,一个是按秒统计的。因为我们这里是使用的FlowRuleManager所以是会记录按分钟统计的。具体是怎么初始化,以及怎么打印统计日志的可以看看我上一篇分析:1.Sentinel源码分析—FlowRuleManager加载规则做了什么?,我这里不再赘述。

所以我们直接看看rollingCounterInMinute.addPass(count)这句代码就好了,这句代码会直接调用ArrayMetric的addPass方法。

ArrayMetric#addPass

public void addPass(int count) { //获取当前的时间窗口 WindowWrap<MetricBucket> wrap = data.currentWindow(); //窗口内的pass加1 wrap.value().addPass(count); }

这里会首先调用currentWindow获取当前的时间窗口WindowWrap,然后调用调用窗口内的MetricBucket的addPass方法加1,我继续拿我上一篇文章的图过来说明:

2. Sentinel源码分析—Sentinel是如何进行流量统计的?

我面来到MetricBucket的addPass方法:
MetricBucket#addPass

public void addPass(int n) { add(MetricEvent.PASS, n); } public MetricBucket add(MetricEvent event, long n) { counters[event.ordinal()].add(n); return this; }

addPass方法会使用枚举类然后将counters数组内的pass槽位的值加n;counters数组是LongAdder数组,所以也不会有线程安全问题。

node.increaseBlockQps和node.increaseExceptionQps代码也是一样的,大家可以自行去看看。

FlowSlot

FlowSlot可以根据预先设置的规则来判断一个请求是否应该被通过。

FlowSlot

private final FlowRuleChecker checker; public FlowSlot() { this(new FlowRuleChecker()); } public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); }

FlowSlot在实例化的时候会设置一个规则检查器,然后在调用entry方法的时候会调用规则检查器的checkFlow方法

我们进入到FlowRuleChecker的checkFlow 方法中:
FlowRuleChecker#checkFlow

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } //返回FlowRuleManager里面注册的所有规则 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { //如果当前的请求不能通过,那么就抛出FlowException异常 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } } private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() { @Override public Collection<FlowRule> apply(String resource) { // Flow rule map should not be null. Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap(); return flowRules.get(resource); } };

checkFlow这个方法就是过去所有的规则然后根据规则进行过滤。主要的过滤操作是在canPassCheck中进行的。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwfdp.html