【一起学源码-微服务】Hystrix 源码三:Hystrix核心流程:Hystix降级、熔断等原理剖析

上一讲我们讲解了Hystrix在配合feign的过程中,一个正常的请求逻辑该怎样处理,这里涉及到线程池的创建、HystrixCommand的执行等逻辑。

如图所示:

Hystrix线程池创建过程及线程调用原理.jpg

高清大图:https://www.processon.com/view/link/5e1c128ce4b0169fb51ce77e

本讲目录

这一讲开始讲解Hystrix的看家本领:熔断+降级。
熔断功能是Hystrix最核心的组件,当然也是最复杂的一块。
源码中细节太多,本讲我们主要还是专注于它的设计思想去学习。

目录如下:

HystrixCircuitBreaker初始化过程

Hystrix熔断机制(CLOSED/OPEN/HALF_OPEN)

fallback降级机制

源码分析 HystrixCircuitBreaker初始化过程

我们还是会以AbstractCommand为突破口,这里继续看它的构造函数,其中里面有初始化熔断器initCircuitBreaker()的过程,具体代码如下:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { if (enabled) { if (fromConstructor == null) { // 构建默认的HystrixCircuitBreaker return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics); } else { return fromConstructor; } } else { return new NoOpCircuitBreaker(); } } } public interface HystrixCircuitBreaker { public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { // circuitBreakersByCommand是一个map,key为commandKey,也就是FeignClient中定义的方法名 // 类似于ServiceAFeignClient.sayHello(String) HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name()); if (previouslyCached != null) { return previouslyCached; } // 每个commandKey都对应着自己的熔断器,如果没有则会构造一个HystrixCircuitBreaker HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics)); if (cbForCommand == null) { return circuitBreakersByCommand.get(key.name()); } else { return cbForCommand; } } class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; private Subscription subscribeToStream() { // 对HealthCounts进行订阅 // HealthCounts中包含 总请求次数、总失败次数、失败率 // HealthCounts 统计数据有变化则会回调到这里来 return metrics.getHealthCountsStream() .observe() .subscribe(new Subscriber<HealthCounts>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } // 判断是否要降级的核心逻辑 @Override public void onNext(HealthCounts hc) { // 一个时间窗口(默认10s钟)总请求次数是否大于circuitBreakerRequestVolumeThreshold 默认为20s if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { } else { // 错误率(总错误次数/总请求次数)小于circuitBreakerErrorThresholdPercentage(默认50%) if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { } else { // 反之,熔断状态将从CLOSED变为OPEN,且circuitOpened==>当前时间戳 if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); } } }

上面就是熔断器初始化过程,这里面做了几件事:

每个commandKey都有自己的一个熔断器
commandKey表现形式为:ServiceAFeignClient#sayHello(String)

如果commandKey不存在熔断器,则构建默认熔断器
默认熔断器会对HealthCounts进行订阅。HealthCounts中包含时间窗口内(默认10s钟)请求的总次数、失败次数、失败率

HealthCounts中统计数据有变化则会回调subscribe.onNext()方法进行熔断开启判断

熔断开启条件:

时间窗口内(默认10s钟)总请求次数大于20次

时间窗口内(默认10s钟)失败率大于50%

满足上述两个条件后熔断器状态从CLOSED变成OPEN

熔断器在第一次请求时会初始化AbtractCommand,同时也会创建对应commandKey的熔断器 ,熔断器默认都是关闭的(可配置为强制开启),只有满足触发条件才会被开启。下面就一起来看下熔断、半开等状态是如何触发的吧。

Hystrix熔断机制(CLOSED/OPEN/HALF_OPEN)

这里我们以AbstractCommand.applyHystrixSemantics() 为入口,一步步往下探究,这个方法在上一讲已经提到过,一个正常的Feign请求都会调用此方法。

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // 如果熔断了,这这里返回为false // 这里也包含HALF_OPEN逻辑 if (circuitBreaker.attemptExecution()) { final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygwzj.html