冷饭新炒:理解断路器CircuitBreaker的原理与实现 (2)

基于此设计图,Martin Fowler大神在其文章中也给予了伪代码如下:

class ResetCircuitBreaker... // 初始化 def initialize &block @circuit = block @invocation_timeout = 0.01 @failure_threshold = 5 @monitor = BreakerMonitor.new @reset_timeout = 0.1 reset end // 重置 def reset @failure_count = 0 @last_failure_time = nil @monitor.alert :reset_circuit end // 状态维护 def state case when (@failure_count >= @failure_threshold) && (Time.now - @last_failure_time) > @reset_timeout :half_open when (@failure_count >= @failure_threshold) :open else :closed end end // 调用 def call args case state when :closed, :half_open begin do_call args // 这里从描述来看应该是漏了调用reset方法 // reset rescue Timeout::Error record_failure raise $! end when :open raise CircuitBreaker::Open else raise "Unreachable" end end // 记录失败 def record_failure @failure_count += 1 @last_failure_time = Time.now @monitor.alert(:open_circuit) if :open == state end

下面的多种实现的思路都是基于此伪代码的基本框架进行编写。

基于异常阈值不会自恢复的实现

这种实现最简单,也就是只需要维护状态Closed转向Open的临界条件即可,可以设定一个异常计数的阈值,然后使用一个原子计数器统计异常数量即可,Java代码实现如下:

// 断路器状态 public enum CircuitBreakerStatus { /** * 关闭 */ CLOSED, /** * 开启 */ OPEN, /** * 半开启 */ HALF_OPEN } @Getter public class SimpleCircuitBreaker { private final long failureThreshold; private final LongAdder failureCounter; private final LongAdder callCounter; private CircuitBreakerStatus status; public SimpleCircuitBreaker(long failureThreshold) { this.failureThreshold = failureThreshold; this.callCounter = new LongAdder(); this.failureCounter = new LongAdder(); this.status = CircuitBreakerStatus.CLOSED; } private final Object fallback = null; @SuppressWarnings("unchecked") public <T> T call(Supplier<T> supplier) { try { if (CircuitBreakerStatus.CLOSED == this.status) { return supplier.get(); } } catch (Exception e) { this.failureCounter.increment(); tryChangingStatus(); } finally { this.callCounter.increment(); } return (T) fallback; } private void tryChangingStatus() { if (this.failureThreshold <= this.failureCounter.sum()) { this.status = CircuitBreakerStatus.OPEN; System.out.println(String.format("SimpleCircuitBreaker状态转换,[%s]->[%s]", CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)); } } public void call(Runnable runnable) { call(() -> { runnable.run(); return null; }); } }

在多线程调用的前提下,如果在很短时间内有大量的线程中的方法调用出现异常,有可能所有调用都会涌进去tryChangingStatus()方法,这种情况下会导致CircuitBreaker的状态被并发修改,可以考虑使用AtomicReference包裹CircuitBreakerStatus,做CAS更新(确保只更新一次)即可。变更的代码如下:

private final AtomicReference<CircuitBreakerStatus> status; public SimpleCircuitBreaker(long failureThreshold) { ...... this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); } public <T> T call(Supplier<T> supplier) { try { if (CircuitBreakerStatus.CLOSED == this.status.get()) { return supplier.get(); } ...... private void tryChangingStatus() { if (this.failureThreshold <= this.failureCounter.sum()) { boolean b = this.status.compareAndSet(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN); if (b) { System.out.println(String.format("SimpleCircuitBreaker状态转换,[%s]->[%s]", CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)); } } }

并发极高的场景下假设出现调用异常前提下,异常计数器failureCounter的计数值有可能在一瞬间就远超过了异常阈值failureCounter,但是一般不考虑对这些计数值的比较或者状态切换的准确时机添加同步机制(例如加锁),因为一旦加入同步机制会大大降低并发性能,这样引入断路器反而成为了性能隐患,显然是不合理的。所以一般设计断路器逻辑的时候,并不需要控制断路器状态切换的具体计数值临界点,保证状态一定切换正常即可。基于此简陋断路器编写一个同步调用的测试例子:

public static class Service { public String process(int i) { System.out.println("进入process方法,number:" + i); throw new RuntimeException(String.valueOf(i)); } } public static void main(String[] args) throws Exception { SimpleCircuitBreaker circuitBreaker = new SimpleCircuitBreaker(5L); Service service = new Service(); for (int i = 0; i < 10; i++) { int temp = i; String result = circuitBreaker.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygpyx.html