这里面核心业务是起一个调度任务,默认每秒钟执行一次,然后调用tick()方法,如果当前command状态还是NOT_EXECUTED状态,那么将command状态改为TIMED_OUT 。此时会进入到之前的handleFallback回调函数中,这里又会更新HealthCounts中的数据,对应的触发之前熔断的判断条件:
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur Subscription s = subscribeToStream(); activeSubscription.set(s); } private Subscription subscribeToStream() { //这里会在每次执行onNext()事件的时候来评估是否需要打开或者关闭断路器 return metrics.getHealthCountsStream() .observe() .subscribe(new Subscriber<HealthCounts>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(HealthCounts hc) { //首先校验的时在时间窗范围内的请求次数,如果低于阈值(默认是20),不做处理,如果高于阈值,则去判断接口请求的错误率 if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // 如果没有超过统计阈值的最低窗口值,就没有必要去改变断路器的状态 // 当前如果断路器是关闭的,那么就保持关闭状态无需更改; // 如果断路器状态为半开状态,需要等待直到有成功的命令执行; // 如果断路器是打开状态,需要等待休眠窗口过期。 } else { //判断接口请求的错误率(阈值默认是50),如果高于这个值,则断路器打开 if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { // 如果当前请求的错误率小于断路器设置的容错率百分比,也不会拦截请求 } else { // 如果当前错误率太高则打开断路器 if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); }如果符合熔断条件,那么command熔断状态就会变为OPEN,此时熔断器打开。
如果我们command执行成功,那么就会清理掉这个timeout timer schedule任务。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { private void handleCommandEnd(boolean commandExecutionStarted) { Reference<TimerListener> tl = timeoutTimer.get(); // 如果timeOutTimer不为空,这里则clear一下 // clear会关闭启动的调度任务 if (tl != null) { tl.clear(); } long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); if (executionResultAtTimeOfCancellation == null) { // metrics统计数据 metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted); } else { metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted); } if (endCurrentThreadExecutingCommand != null) { endCurrentThreadExecutingCommand.call(); } } }如上所属,我们已经知道了熔断开启的触发时机,那么如果一个commandKey开启了熔断,下次的请求是该如何直接降级呢?我们来看下代码:
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // 这个if条件就代表是否开启熔断 if (circuitBreaker.attemptExecution()) { // 执行业务逻辑代码... } else { return handleShortCircuitViaFallback(); } } } class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { public boolean attemptExecution() { // 如果熔断配置的为强制开启,那么直接返回false执行熔断逻辑 if (properties.circuitBreakerForceOpen().get()) { return false; } // 如果熔断配置为强制关闭,那么永远不走熔断逻辑 if (properties.circuitBreakerForceClosed().get()) { return true; } // 熔断开启时 circuitOpened设置为当前时间戳 if (circuitOpened.get() == -1) { return true; } else { // 如果当前时间距离熔断小于5s钟,那么将熔断状态从OPEN改为HALF_OPEN if (isAfterSleepWindow()) { if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { //only the first request after sleep window should execute return true; } else { return false; } } else { return false; } } } } private boolean isAfterSleepWindow() { final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); // circuitBreakerSleepWindowInMilliseconds 默认为5s钟 final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); // 当前熔断距离熔断是否超过5s钟 return currentTime > circuitOpenTime + sleepWindowTime; } }我们可以看到,在applyHystrixSemantics()这个核心的方法中,先判断是否熔断,如果熔断则直接走fallback逻辑。