2. Sentinel源码分析—Sentinel是如何进行流量统计的?

这一篇我还是继续上一篇没有讲完的内容,先上一个例子:

private static final int threadCount = 100; public static void main(String[] args) { initFlowRule(); for (int i = 0; i < threadCount; i++) { Thread entryThread = new Thread(new Runnable() { @Override public void run() { while (true) { Entry methodA = null; try { TimeUnit.MILLISECONDS.sleep(5); methodA = SphU.entry("methodA"); } catch (BlockException e1) { // Block exception } catch (Exception e2) { // biz exception } finally { if (methodA != null) { methodA.exit(); } } } } }); entryThread.setName("working thread"); entryThread.start(); } } private static void initFlowRule() { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); rule1.setResource("methodA"); // set limit concurrent thread for 'methodA' to 20 rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD); rule1.setLimitApp("default"); rules.add(rule1); FlowRuleManager.loadRules(rules); } SphU#entry

我先把例子放上来

Entry methodA = null; try { methodA = SphU.entry("methodA"); // dosomething } catch (BlockException e1) { block.incrementAndGet(); } catch (Exception e2) { // biz exception } finally { total.incrementAndGet(); if (methodA != null) { methodA.exit(); } }

我们先进入到entry方法里面:
SphU#entry

public static Entry entry(String name) throws BlockException { return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); }

这个方法里面会调用Env的sph静态方法,我们进入到Env里面看看

public class Env { public static final Sph sph = new CtSph(); static { // If init fails, the process will exit. InitExecutor.doInit(); } }

这个方法初始化的时候会调用InitExecutor.doInit()
InitExecutor#doInit

public static void doInit() { //InitExecutor只会初始化一次,并且初始化失败会退出 if (!initialized.compareAndSet(false, true)) { return; } try { //通过spi加载InitFunc子类,默认是MetricCallbackInit ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class); List<OrderWrapper> initList = new ArrayList<OrderWrapper>(); for (InitFunc initFunc : loader) { RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName()); //由于这里只有一个loader里面只有一个子类,那么直接就返回initList里面包含一个元素的集合 insertSorted(initList, initFunc); } for (OrderWrapper w : initList) { //这里调用MetricCallbackInit的init方法 w.func.init(); RecordLog.info(String.format("[InitExecutor] Executing %s with order %d", w.func.getClass().getCanonicalName(), w.order)); } } catch (Exception ex) { RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex); ex.printStackTrace(); } catch (Error error) { RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error); error.printStackTrace(); } }

这个方法主要是通过spi加载InitFunc 的子类,默认是MetricCallbackInit。
然后会将MetricCallbackInit封装成OrderWrapper实例,然后遍历,调用
MetricCallbackInit的init方法:

MetricCallbackInit#init

public void init() throws Exception { //添加回调函数 //key是com.alibaba.csp.sentinel.metric.extension.callback.MetricEntryCallback StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(), new MetricEntryCallback()); //key是com.alibaba.csp.sentinel.metric.extension.callback.MetricExitCallback StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(), new MetricExitCallback()); }

这个init方法就是注册了两个回调实例MetricEntryCallback和MetricExitCallback。

然后会通过调用Env.sph.entry会最后调用到CtSph的entry方法:

public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { //这里name是Resource,type是out StringResourceWrapper resource = new StringResourceWrapper(name, type); //count是1 ,args是一个空数组 return entry(resource, count, args); }

这个方法会将resource和type封装成StringResourceWrapper实例,然后调用entry重载方法追踪到CtSph的entryWithPriority方法。

//这里传入得参数count是1,prioritized=false,args是容量为1的空数组 private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { //获取当前线程的上下文 Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } //为空的话,创建一个默认的context if (context == null) { //1 // Using default context. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); } // Global switch is close, no rule checking will do. if (!Constants.ON) {//这里会返回false return new CtEntry(resourceWrapper, null, context); } //2 //创建一系列功能插槽 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ //如果超过了插槽的最大数量,那么会返回null if (chain == null) { return new CtEntry(resourceWrapper, null, context); } Entry e = new CtEntry(resourceWrapper, chain, context); try { //3 //调用责任链 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }

这个方法是最核心的方法,主要做了三件事:

如果context为null则创建一个新的

通过责任链方式创建功能插槽

调用责任链插槽

在讲创建context之前我们先看一下ContextUtil这个类初始化的时候会做什么

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwfdp.html