冷饭新炒:理解断路器CircuitBreaker的原理与实现 (3)

测试结果输出如下:

进入process方法,number:0 返回结果:null,number:0 进入process方法,number:1 返回结果:null,number:1 进入process方法,number:2 返回结果:null,number:2 进入process方法,number:3 返回结果:null,number:3 进入process方法,number:4 SimpleCircuitBreaker状态转换,[CLOSED]->[OPEN] 返回结果:null,number:4 返回结果:null,number:5 返回结果:null,number:6 返回结果:null,number:7 返回结果:null,number:8 返回结果:null,number:9

细心的伙伴会发现,基本上状态的维护和变更和数据统计都位于调用异常或者失败的方法入口以及最后的finally代码块,在真实的调用逻辑前一般只会做状态判断或者下文提到的分配调用资源等。

基于异常阈值并且能够自恢复的实现

基于异常阈值、能够自恢复的CircuitBreaker实现需要引入Half_Open状态,同时需要记录最后一次失败调用的时间戳以及reset_timeout(断路器的当前的系统时间戳减去上一阶段最后一次失败调用的时间差,大于某个值的时候,并且当前的失败调用大于失败阈值则需要把状态重置为Half_Open,这里的"某个值"定义为reset_timeout),示意图如下:

冷饭新炒:理解断路器CircuitBreaker的原理与实现

假设当前的调用为圆形6,当前系统时间戳减去(上一轮)最后一个失败调用(圆形5)的时间戳大于预设的reset_timeout的时候,不论当次调用是成功还是失败,直到下一次调用失败或者失败调用数降低到转换为Closed状态之前,都处于Half_Open状态,会对单个调用进行放行(并发场景下也有可能同时放行多个调用)。代码实现如下:

// 添加一个Monitor用于记录状态变更 public enum CircuitBreakerStatusMonitor { /** * 单例 */ X; public void report(String name, CircuitBreakerStatus o, CircuitBreakerStatus n) { System.out.println(String.format("断路器[%s]状态变更,[%s]->[%s]", name, o, n)); } public void reset(String name) { System.out.println(String.format("断路器[%s]重置", name)); } } @Getter public class RestCircuitBreaker { private final long failureThreshold; private final long resetTimeout; private LongAdder failureCounter; private LongAdder callCounter; private AtomicReference<CircuitBreakerStatus> status; private final Object fallback = null; /** * 最后一次调用失败的时间戳 */ private long lastFailureTime; public RestCircuitBreaker(long failureThreshold, long resetTimeout) { this.failureThreshold = failureThreshold; this.resetTimeout = resetTimeout; reset(); } public void reset() { CircuitBreakerStatusMonitor.X.reset("RestCircuitBreaker"); this.callCounter = new LongAdder(); this.failureCounter = new LongAdder(); this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); this.lastFailureTime = -1L; } @SuppressWarnings("unchecked") public <T> T call(Supplier<T> supplier) { try { if (shouldAllowExecution()) { T result = supplier.get(); markSuccess(); return result; } } catch (Exception e) { markNoneSuccess(); } finally { this.callCounter.increment(); } return (T) fallback; } public void call(Runnable runnable) { call(() -> { runnable.run(); return null; }); } boolean shouldAllowExecution() { // 本质是Closed状态 if (lastFailureTime == -1L) { return true; } // 没到达阈值 if (failureThreshold > failureCounter.sum()) { return true; } return shouldTryAfterRestTimeoutWindow() && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN); } boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) { boolean r = status.compareAndSet(o, n); if (r) { CircuitBreakerStatusMonitor.X.report("RestCircuitBreaker", o, n); } return r; } boolean shouldTryAfterRestTimeoutWindow() { long lastFailureTimeSnap = lastFailureTime; long currentTime = System.currentTimeMillis(); return currentTime > lastFailureTimeSnap + resetTimeout; } public void markSuccess() { if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) { reset(); } } public void markNoneSuccess() { this.failureCounter.increment(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } if (this.failureCounter.sum() >= failureThreshold && changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygpyx.html