这一篇我还是继续上一篇没有讲完的内容,先上一个例子:
private static final int threadCount = 100; public static void main(String[] args) { initFlowRule(); for (int i = 0; i < threadCount; i++) { Thread entryThread = new Thread(new Runnable() { @Override public void run() { while (true) { Entry methodA = null; try { TimeUnit.MILLISECONDS.sleep(5); methodA = SphU.entry("methodA"); } catch (BlockException e1) { // Block exception } catch (Exception e2) { // biz exception } finally { if (methodA != null) { methodA.exit(); } } } } }); entryThread.setName("working thread"); entryThread.start(); } } private static void initFlowRule() { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); rule1.setResource("methodA"); // set limit concurrent thread for 'methodA' to 20 rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD); rule1.setLimitApp("default"); rules.add(rule1); FlowRuleManager.loadRules(rules); } SphU#entry我先把例子放上来
Entry methodA = null; try { methodA = SphU.entry("methodA"); // dosomething } catch (BlockException e1) { block.incrementAndGet(); } catch (Exception e2) { // biz exception } finally { total.incrementAndGet(); if (methodA != null) { methodA.exit(); } }我们先进入到entry方法里面:
SphU#entry
这个方法里面会调用Env的sph静态方法,我们进入到Env里面看看
public class Env { public static final Sph sph = new CtSph(); static { // If init fails, the process will exit. InitExecutor.doInit(); } }这个方法初始化的时候会调用InitExecutor.doInit()
InitExecutor#doInit
这个方法主要是通过spi加载InitFunc 的子类,默认是MetricCallbackInit。
然后会将MetricCallbackInit封装成OrderWrapper实例,然后遍历,调用
MetricCallbackInit的init方法:
MetricCallbackInit#init
public void init() throws Exception { //添加回调函数 //key是com.alibaba.csp.sentinel.metric.extension.callback.MetricEntryCallback StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(), new MetricEntryCallback()); //key是com.alibaba.csp.sentinel.metric.extension.callback.MetricExitCallback StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(), new MetricExitCallback()); }这个init方法就是注册了两个回调实例MetricEntryCallback和MetricExitCallback。
然后会通过调用Env.sph.entry会最后调用到CtSph的entry方法:
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { //这里name是Resource,type是out StringResourceWrapper resource = new StringResourceWrapper(name, type); //count是1 ,args是一个空数组 return entry(resource, count, args); }这个方法会将resource和type封装成StringResourceWrapper实例,然后调用entry重载方法追踪到CtSph的entryWithPriority方法。
//这里传入得参数count是1,prioritized=false,args是容量为1的空数组 private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { //获取当前线程的上下文 Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } //为空的话,创建一个默认的context if (context == null) { //1 // Using default context. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); } // Global switch is close, no rule checking will do. if (!Constants.ON) {//这里会返回false return new CtEntry(resourceWrapper, null, context); } //2 //创建一系列功能插槽 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ //如果超过了插槽的最大数量,那么会返回null if (chain == null) { return new CtEntry(resourceWrapper, null, context); } Entry e = new CtEntry(resourceWrapper, chain, context); try { //3 //调用责任链 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }这个方法是最核心的方法,主要做了三件事:
如果context为null则创建一个新的
通过责任链方式创建功能插槽
调用责任链插槽
在讲创建context之前我们先看一下ContextUtil这个类初始化的时候会做什么