最近我很好奇在RPC中限流熔断降级要怎么做,hystrix已经1年多没有更新了,感觉要被遗弃的感觉,那么我就把眼光聚焦到了阿里的Sentinel,顺便学习一下阿里的源代码。
这一章我主要讲的是FlowRuleManager在加载FlowRule的时候做了什么,下一篇正式讲Sentinel如何控制并发数的。
下面我给出一个简化版的demo,这个demo只能单线程访问,先把过程讲清楚再讲多线程版本。
初始化流量控制的规则:限定20个线程并发访问
public class FlowThreadDemo { private static AtomicInteger pass = new AtomicInteger(); private static AtomicInteger block = new AtomicInteger(); private static AtomicInteger total = new AtomicInteger(); private static AtomicInteger activeThread = new AtomicInteger(); private static volatile boolean stop = false; private static final int threadCount = 100; private static int seconds = 60 + 40; private static volatile int methodBRunningTime = 2000; public static void main(String[] args) throws Exception { System.out.println( "MethodA will call methodB. After running for a while, methodB becomes fast, " + "which make methodA also become fast "); tick(); initFlowRule(); Entry methodA = null; try { TimeUnit.MILLISECONDS.sleep(5); methodA = SphU.entry("methodA"); activeThread.incrementAndGet(); //Entry methodB = SphU.entry("methodB"); TimeUnit.MILLISECONDS.sleep(methodBRunningTime); //methodB.exit(); pass.addAndGet(1); } catch (BlockException e1) { block.incrementAndGet(); } catch (Exception e2) { // biz exception } finally { total.incrementAndGet(); if (methodA != null) { methodA.exit(); activeThread.decrementAndGet(); } } } private static void initFlowRule() { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); rule1.setResource("methodA"); // set limit concurrent thread for 'methodA' to 20 rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD); rule1.setLimitApp("default"); rules.add(rule1); FlowRuleManager.loadRules(rules); } private static void tick() { Thread timer = new Thread(new TimerTask()); timer.setName("sentinel-timer-task"); timer.start(); } static class TimerTask implements Runnable { @Override public void run() { long start = System.currentTimeMillis(); System.out.println("begin to statistic!!!"); long oldTotal = 0; long oldPass = 0; long oldBlock = 0; while (!stop) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } long globalTotal = total.get(); long oneSecondTotal = globalTotal - oldTotal; oldTotal = globalTotal; long globalPass = pass.get(); long oneSecondPass = globalPass - oldPass; oldPass = globalPass; long globalBlock = block.get(); long oneSecondBlock = globalBlock - oldBlock; oldBlock = globalBlock; System.out.println(seconds + " total qps is: " + oneSecondTotal); System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock + " activeThread:" + activeThread.get()); if (seconds-- <= 0) { stop = true; } if (seconds == 40) { System.out.println("method B is running much faster; more requests are allowed to pass"); methodBRunningTime = 20; } } long cost = System.currentTimeMillis() - start; System.out.println("time cost: " + cost + " ms"); System.out.println("total:" + total.get() + ", pass:" + pass.get() + ", block:" + block.get()); System.exit(0); } } } FlowRuleManager在这个demo中,首先会调用FlowRuleManager#loadRules进行规则注册
我们先聊一下规则配置的代码:
这段代码里面先定义一个流量控制规则,然后调用loadRules进行注册。
FlowRuleManager初始化FlowRuleManager
FlowRuleManager 类里面有几个静态参数:
在初始化的时候会为静态变量都赋上值。
在新建MetricTimerListener实例的时候做了很多事情,容我慢慢分析。