编写一个测试客户端RestCircuitBreakerClient:
public class RestCircuitBreakerClient { public static void main(String[] args) throws Exception { Service service = new Service(); RestCircuitBreaker cb = new RestCircuitBreaker(5, 500); for (int i = 0; i < 10; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } Thread.sleep(501L); // 故意成功 cb.call(service::processSuccess); for (int i = 0; i < 3; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } } public static class Service { public String process(int i) { System.out.println("进入process方法,number:" + i); throw new RuntimeException(String.valueOf(i)); } public void processSuccess() { System.out.println("调用processSuccess方法"); } } }输出结果如下:
断路器[RestCircuitBreaker]重置 进入process方法,number:0 返回结果:null,number:0 进入process方法,number:1 返回结果:null,number:1 进入process方法,number:2 返回结果:null,number:2 进入process方法,number:3 返回结果:null,number:3 进入process方法,number:4 断路器[RestCircuitBreaker]状态变更,[CLOSED]->[OPEN] 返回结果:null,number:4 返回结果:null,number:5 返回结果:null,number:6 返回结果:null,number:7 返回结果:null,number:8 返回结果:null,number:9 断路器[RestCircuitBreaker]状态变更,[OPEN]->[HALF_OPEN] 调用processSuccess方法 # <------ 这个位置的成功调用重置了断路器的状态 断路器[RestCircuitBreaker]状态变更,[HALF_OPEN]->[CLOSED] 断路器[RestCircuitBreaker]重置 进入process方法,number:0 返回结果:null,number:0 进入process方法,number:1 返回结果:null,number:1 进入process方法,number:2 返回结果:null,number:2 基于线程池隔离和超时控制在使用CircuitBreaker的时候,可以基于不同的资源(唯一标识可以使用resource_key或者resource_name)创建单独的线程池,让资源基于线程池进行隔离调用。这种设计的原则借鉴于运货船的船舱设计,每个船舱都使用绝缘的材料进行分隔,一旦某个船舱出现了火情,也不会蔓延到其他船舱。在Java体系中,可以使用线程池ThreadPoolExecutor#submit(Callable<T> task)进行指定超时上限限制的任务提交和结果获取,这样就可以预设一个调用超时时间上限,限制每个调用的可用的最大调用时间。
首先需要设计一个轻量级的资源线程池管理模块:
// 资源配置 @Data public class CircuitBreakerResourceConf { private String resourceName; private int coreSize; private int queueSize; private long timeout; } public enum CircuitBreakerResourceManager { /** * 单例 */ X; public final Map<String, CircuitBreakerResource> cache = new ConcurrentHashMap<>(8); public void register(CircuitBreakerResourceConf conf) { cache.computeIfAbsent(conf.getResourceName(), rn -> { int coreSize = conf.getCoreSize(); int queueSize = conf.getQueueSize(); BlockingQueue<Runnable> queue; if (queueSize > 0) { queue = new ArrayBlockingQueue<>(queueSize); } else { queue = new SynchronousQueue<>(); } ThreadPoolExecutor executor = new ThreadPoolExecutor( coreSize, coreSize, 0, TimeUnit.SECONDS, queue, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName(rn + "-CircuitBreakerWorker-" + counter.getAndIncrement()); return thread; } }, new ThreadPoolExecutor.AbortPolicy() ); CircuitBreakerResource resource = new CircuitBreakerResource(); resource.setExecutor(executor); resource.setTimeout(conf.getTimeout()); return resource; }); } public CircuitBreakerResource get(String resourceName) { return Optional.ofNullable(cache.get(resourceName)).orElseThrow(() -> new IllegalArgumentException(resourceName)); } }编写断路器ResourceCircuitBreaker的实现代码:
@Getter public class ResourceCircuitBreaker { private final long failureThreshold; private final long resetTimeout; private LongAdder failureCounter; private LongAdder callCounter; private AtomicReference<CircuitBreakerStatus> status; private final ThreadPoolExecutor executor; private final Object fallback = null; private final String circuitBreakerName; /** * 最后一次调用失败的时间戳 */ private long lastFailureTime; /** * 执行超时上限,单位毫秒 */ private final long executionTimeout; public ResourceCircuitBreaker(String resourceName, long failureThreshold, long resetTimeout) { CircuitBreakerResource resource = CircuitBreakerResourceManager.X.get(resourceName); this.circuitBreakerName = "ResourceCircuitBreaker-" + resourceName; this.executor = resource.getExecutor(); this.executionTimeout = resource.getTimeout(); this.failureThreshold = failureThreshold; this.resetTimeout = resetTimeout; reset(); } public void reset() { CircuitBreakerStatusMonitor.X.reset(this.circuitBreakerName); this.callCounter = new LongAdder(); this.failureCounter = new LongAdder(); this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); this.lastFailureTime = -1L; } @SuppressWarnings("unchecked") public <T> T call(Supplier<T> supplier) { try { if (shouldAllowExecution()) { Future<T> future = this.executor.submit(warp(supplier)); T result = future.get(executionTimeout, TimeUnit.MILLISECONDS); markSuccess(); return result; } } catch (Exception e) { markNoneSuccess(); } finally { this.callCounter.increment(); } return (T) fallback; } <T> Callable<T> warp(Supplier<T> supplier) { return supplier::get; } public void call(Runnable runnable) { call(() -> { runnable.run(); return null; }); } boolean shouldAllowExecution() { // 本质是Closed状态 if (lastFailureTime == -1L) { return true; } // 没到达阈值 if (failureThreshold > failureCounter.sum()) { return true; } return shouldTryAfterRestTimeoutWindow() && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN); } boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) { boolean r = status.compareAndSet(o, n); if (r) { CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, o, n); } return r; } boolean shouldTryAfterRestTimeoutWindow() { long lastFailureTimeSnap = lastFailureTime; long currentTime = System.currentTimeMillis(); return currentTime > lastFailureTimeSnap + resetTimeout; } public void markSuccess() { if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) { reset(); } } public void markNoneSuccess() { this.failureCounter.increment(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } if (this.failureCounter.sum() >= failureThreshold && changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } } }