最后,把SlidingWindowMonitor和之前的ResourceCircuitBreaker做一次融合进化,得到SlidingWindowCircuitBreaker:
package cn.throwx.cb; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; /** * @author throwable * @version v1 * @description * @since 2020/10/25 17:14 */ public class SlidingWindowCircuitBreaker { /** * 失败百分比阈值 */ private final long errorPercentThreshold; /** * 熔断等待窗口 */ private final long resetTimeout; private AtomicReference<CircuitBreakerStatus> status; private final ThreadPoolExecutor executor; private final String circuitBreakerName; /** * 最后一次调用失败的时间戳 */ private long lastFailureTime; /** * 执行超时上限,单位毫秒 */ private final long executionTimeout; /** * 滑动窗口监视器 */ private final SlidingWindowMonitor slidingWindowMonitor; public SlidingWindowCircuitBreaker(String resourceName, long errorPercentThreshold, long resetTimeout) { CircuitBreakerResource resource = CircuitBreakerResourceManager.X.get(resourceName); this.circuitBreakerName = "SlidingWindowCircuitBreaker-" + resourceName; this.executor = resource.getExecutor(); this.executionTimeout = resource.getTimeout(); this.errorPercentThreshold = errorPercentThreshold; this.resetTimeout = resetTimeout; this.slidingWindowMonitor = new SlidingWindowMonitor(); reset(); } public void reset() { CircuitBreakerStatusMonitor.X.reset(this.circuitBreakerName); this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); this.lastFailureTime = -1L; } @SuppressWarnings("unchecked") public <T> T call(Supplier<T> supplier) { return call(supplier, (Fallback<T>) Fallback.F); } public <T> T call(Supplier<T> supplier, Fallback<T> fallback) { try { if (shouldAllowExecution()) { slidingWindowMonitor.incrementTotal(); Future<T> future = this.executor.submit(warp(supplier)); T result = future.get(executionTimeout, TimeUnit.MILLISECONDS); markSuccess(); return result; } } catch (RejectedExecutionException ree) { markReject(); } catch (Exception e) { markFailure(); } return fallback.fallback(); } <T> Callable<T> warp(Supplier<T> supplier) { return supplier::get; } public void call(Runnable runnable) { call(() -> { runnable.run(); return null; }); } boolean shouldAllowExecution() { // 本质是Closed状态 if (lastFailureTime == -1L) { return true; } // 没到达阈值 if (errorPercentThreshold > rollingErrorPercentage()) { return false; } return shouldTryAfterRestTimeoutWindow() && changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN); } boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) { boolean r = status.compareAndSet(o, n); if (r) { CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, o, n); } return r; } boolean shouldTryAfterRestTimeoutWindow() { long lastFailureTimeSnap = lastFailureTime; long currentTime = System.currentTimeMillis(); return currentTime > lastFailureTimeSnap + resetTimeout; } public void markSuccess() { slidingWindowMonitor.incrementSuccess(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) { reset(); } } public void markReject() { slidingWindowMonitor.incrementReject(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } } public int rollingErrorPercentage() { MetricInfo rollingMetricInfo = slidingWindowMonitor.getRollingMetricInfo(); long rejectCount = rollingMetricInfo.getReject(); long failureCount = rollingMetricInfo.getFailure(); long totalCount = rollingMetricInfo.getTotal(); int errorPercentage = (int) ((double) (rejectCount + failureCount) / totalCount * 100); CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, String.format("错误百分比:%d", errorPercentage)); return errorPercentage; } public void markFailure() { slidingWindowMonitor.incrementFailure(); if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } if (rollingErrorPercentage() >= errorPercentThreshold && changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) { this.lastFailureTime = System.currentTimeMillis(); } } }编写一个测试客户端SlidingWindowCircuitBreakerClient:
public class SlidingWindowCircuitBreakerClient { public static void main(String[] args) throws Exception { CircuitBreakerResourceConf conf = new CircuitBreakerResourceConf(); conf.setCoreSize(10); conf.setQueueSize(0); conf.setResourceName("SERVICE"); conf.setTimeout(50); CircuitBreakerResourceManager.X.register(conf); Service service = new Service(); SlidingWindowCircuitBreaker cb = new SlidingWindowCircuitBreaker("SERVICE", 50, 500); for (int i = 0; i < 10; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } Thread.sleep(501L); cb.call(service::processSuccess); for (int i = 0; i < 3; i++) { int temp = i; String result = cb.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } Thread.sleep(501L); cb.call(service::processSuccess); cb.call(service::processSuccess); } public static class Service { private final Random r = new Random(); public String process(int i) { int sleep = r.nextInt(200); System.out.println(String.format("线程[%s]-进入process方法,number:%d,休眠%d毫秒", Thread.currentThread().getName(), i, sleep)); try { Thread.sleep(sleep); } catch (InterruptedException ignore) { } return String.valueOf(i); } public void processSuccess() { System.out.println(String.format("线程[%s]-调用processSuccess方法", Thread.currentThread().getName())); } } }