上一个小节已经实现了资源基于线程池隔离进行调用,但是有一点明显的不足就是:断路器的状态管理和重置并不符合生产场景,HALF_OPEN -> CLOSED的状态切换和重置不应该在放行单个调用成功之后立刻触发,而应该建立在一定时间范围内,调用的(平均)失败率下降到某个阈值或者调用的(平均)成功率恢复到某个阈值,否则很多场景下会导致断路器的状态频繁发生切换,功能基本处于失效的状态。也就是大多数场景下,一段时间内的failurePercent会比异常计数和failureThreshold的直接对比更加准确。可以引入滑动窗口(Sliding Window)的概念,记录每个时间单元内的调用总次数、调用成功次数、调用超时次数和非超时的调用失败次数,为了简化操作这个时间单元定义为1秒:
定义一个用于记录这四种调用次数的桶Bucket类(这里的实现稍微跟上图有点不同,非超时失败修改为线程池拒绝的任务统计,而失败统计包括了任务超时执行和一般的业务异常):
@RequiredArgsConstructor @Getter public class MetricInfo { private final long total; private final long success; private final long failure; private final long reject; public static final MetricInfo EMPTY = new MetricInfo(0, 0, 0, 0); public MetricInfo merge(MetricInfo other) { return new MetricInfo( this.total + other.getTotal(), this.success + other.getSuccess(), this.failure + other.getFailure(), this.reject + other.getReject() ); } } public class Bucket { // 记录窗口开始的时间戳 @Getter private final long windowStartTimestamp; private final LongAdder total; private final LongAdder success; private final LongAdder failure; private final LongAdder reject; public Bucket(long windowStartTimestamp) { this.windowStartTimestamp = windowStartTimestamp; this.total = new LongAdder(); this.success = new LongAdder(); this.reject = new LongAdder(); this.failure = new LongAdder(); } public void increaseTotal() { this.total.increment(); } public void increaseSuccess() { this.success.increment(); } public void increaseFailure() { this.failure.increment(); } public void increaseReject() { this.reject.increment(); } public long totalCount() { return this.total.sum(); } public long successCount() { return this.success.sum(); } public long failureCount() { return this.failure.sum(); } public long rejectCount() { return this.reject.sum(); } public void reset() { this.total.reset(); this.success.reset(); this.failure.reset(); this.reject.reset(); } public MetricInfo metricInfo() { return new MetricInfo( totalCount(), successCount(), failureCount(), rejectCount() ); } @Override public String toString() { return String.format("Bucket[wt=%d,t=%d,s=%d,f=%d,r=%d]", windowStartTimestamp, totalCount(), successCount(), failureCount(), rejectCount() ); } }在Hystrix中,为了更加灵活,Bucket中的计数器设计为LongAdder[]类型,便于通过各种需要计数事件枚举的顺序值来直接进行计数和累加,而为了节约内存空间,滑动窗口设计成一个容量固定可复用的环形队列BucketCircularArray#ListState,这里可以站在巨人的肩膀上借鉴其思路实现BucketCircular:
public class BucketCircular implements Iterable<Bucket> { private final AtomicReference<BucketArray> bucketArray; public BucketCircular(int bucketNumber) { // 这里有个技巧,初始化数组的时候让数组的总长度为桶数量+1,便于额外的添加和移除桶操作 AtomicReferenceArray<Bucket> buckets = new AtomicReferenceArray<>(bucketNumber + 1); this.bucketArray = new AtomicReference<>(new BucketArray(buckets, 0, 0, bucketNumber)); } public Bucket getTail() { return this.bucketArray.get().tail(); } /** * 在环形队列尾部添加一个桶 */ public void addTail(Bucket bucket) { BucketArray bucketArray = this.bucketArray.get(); BucketArray newBucketArray = bucketArray.addBucket(bucket); // 这个方法会在锁中执行,理论上不会CAS失败 this.bucketArray.compareAndSet(bucketArray, newBucketArray); } public Bucket[] toArray() { return this.bucketArray.get().toArray(); } public int size() { return this.bucketArray.get().getSize(); } @Override public Iterator<Bucket> iterator() { return Collections.unmodifiableList(Arrays.asList(toArray())).iterator(); } public void clear() { while (true) { BucketArray bucketArray = this.bucketArray.get(); BucketArray clear = bucketArray.clear(); if (this.bucketArray.compareAndSet(bucketArray, clear)) { return; } } } }