1.Sentinel源码分析—FlowRuleManager加载规则做了什么?

最近我很好奇在RPC中限流熔断降级要怎么做,hystrix已经1年多没有更新了,感觉要被遗弃的感觉,那么我就把眼光聚焦到了阿里的Sentinel,顺便学习一下阿里的源代码。

这一章我主要讲的是FlowRuleManager在加载FlowRule的时候做了什么,下一篇正式讲Sentinel如何控制并发数的。

下面我给出一个简化版的demo,这个demo只能单线程访问,先把过程讲清楚再讲多线程版本。

初始化流量控制的规则:限定20个线程并发访问

public class FlowThreadDemo { private static AtomicInteger pass = new AtomicInteger(); private static AtomicInteger block = new AtomicInteger(); private static AtomicInteger total = new AtomicInteger(); private static AtomicInteger activeThread = new AtomicInteger(); private static volatile boolean stop = false; private static final int threadCount = 100; private static int seconds = 60 + 40; private static volatile int methodBRunningTime = 2000; public static void main(String[] args) throws Exception { System.out.println( "MethodA will call methodB. After running for a while, methodB becomes fast, " + "which make methodA also become fast "); tick(); initFlowRule(); Entry methodA = null; try { TimeUnit.MILLISECONDS.sleep(5); methodA = SphU.entry("methodA"); activeThread.incrementAndGet(); //Entry methodB = SphU.entry("methodB"); TimeUnit.MILLISECONDS.sleep(methodBRunningTime); //methodB.exit(); pass.addAndGet(1); } catch (BlockException e1) { block.incrementAndGet(); } catch (Exception e2) { // biz exception } finally { total.incrementAndGet(); if (methodA != null) { methodA.exit(); activeThread.decrementAndGet(); } } } private static void initFlowRule() { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); rule1.setResource("methodA"); // set limit concurrent thread for 'methodA' to 20 rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD); rule1.setLimitApp("default"); rules.add(rule1); FlowRuleManager.loadRules(rules); } private static void tick() { Thread timer = new Thread(new TimerTask()); timer.setName("sentinel-timer-task"); timer.start(); } static class TimerTask implements Runnable { @Override public void run() { long start = System.currentTimeMillis(); System.out.println("begin to statistic!!!"); long oldTotal = 0; long oldPass = 0; long oldBlock = 0; while (!stop) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } long globalTotal = total.get(); long oneSecondTotal = globalTotal - oldTotal; oldTotal = globalTotal; long globalPass = pass.get(); long oneSecondPass = globalPass - oldPass; oldPass = globalPass; long globalBlock = block.get(); long oneSecondBlock = globalBlock - oldBlock; oldBlock = globalBlock; System.out.println(seconds + " total qps is: " + oneSecondTotal); System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock + " activeThread:" + activeThread.get()); if (seconds-- <= 0) { stop = true; } if (seconds == 40) { System.out.println("method B is running much faster; more requests are allowed to pass"); methodBRunningTime = 20; } } long cost = System.currentTimeMillis() - start; System.out.println("time cost: " + cost + " ms"); System.out.println("total:" + total.get() + ", pass:" + pass.get() + ", block:" + block.get()); System.exit(0); } } } FlowRuleManager

在这个demo中,首先会调用FlowRuleManager#loadRules进行规则注册
我们先聊一下规则配置的代码:

private static void initFlowRule() { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); rule1.setResource("methodA"); // set limit concurrent thread for 'methodA' to 20 rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD); rule1.setLimitApp("default"); rules.add(rule1); FlowRuleManager.loadRules(rules); }

这段代码里面先定义一个流量控制规则,然后调用loadRules进行注册。

FlowRuleManager初始化

FlowRuleManager
FlowRuleManager 类里面有几个静态参数:

//规则集合 private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>(); //监听器 private static final FlowPropertyListener LISTENER = new FlowPropertyListener(); //用来监听配置是否发生变化 private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>(); //创建一个延迟的线程池 @SuppressWarnings("PMD.ThreadPoolCreationRule") private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, new NamedThreadFactory("sentinel-metrics-record-task", true)); static { //设置监听 currentProperty.addListener(LISTENER); //每一秒钟调用一次MetricTimerListener的run方法 SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); }

在初始化的时候会为静态变量都赋上值。

在新建MetricTimerListener实例的时候做了很多事情,容我慢慢分析

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpzwzz.html