【一起学源码-微服务】Hystrix 源码三:Hystrix核心流程:Hystix降级、熔断等原理剖析 (2)

circuitBreaker.attemptExecution() 这个逻辑就是判断,如果熔断了,那么返回false。而且这里还包含HALF_OPEN的逻辑,我们先看如何触发熔断的,这个后面再接着看。

接着往下跟进executeCommandAndObserve() 方法:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); // 省略部分代码... // 运行过程中,出现异常等都会进入此回调函数 final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { // 这里创建一个 HystrixObservableTimeoutOperator execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); } }

当我们服务调用中出现异常都会进入handleFallback()中,里面的方法我们就不继续跟入了,猜测里面会有更新HealthCounts中的属性,然后触发 HystrixCircuitBreaker中的onNext()方法,当满足熔断条件时 则会将熔断状态从CLOSED变成OPEN。

这里我们会跟进下HystrixObservableTimeoutOperator 代码,这个是对我们执行过程中判断是否超时。
上面代码中,执行executeCommandWithSpecifiedIsolation() 方法时也会创建一个超时监视器:

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { final AbstractCommand<R> originalCommand; public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) { this.originalCommand = originalCommand; } @Override public Subscriber<? super R> call(final Subscriber<? super R> child) { TimerListener listener = new TimerListener() { @Override public void tick() { // 判断command的timeOut状态,如果是未执行状态,则更新为已超时 if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); timeoutRunnable.run(); } } @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } }; final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); originalCommand.timeoutTimer.set(tl); // 省略部分代码... s.add(parent); return parent; } } public class HystrixTimer { public Reference<TimerListener> addTimerListener(final TimerListener listener) { startThreadIfNeeded(); Runnable r = new Runnable() { @Override public void run() { try { // 执行上面的tick方法,改变command timeout状态 listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); } } }; // 执行调度任务,延迟加载,延迟时间和调度时间默认都为1s钟 // 这里使用线程池,coreSize=cpu核心数 maxSize为Integer.Max ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygwzj.html