SpringCloud-Hystrix原理 (3)

HystrixCommand#queue方法如下所示。这是Command的异步执行方法。

public Future<R> queue() {
final Observable<R> o = toObservable();
final Future<R> f = o.toBlocking().toFuture();
/* special handling of error states that throw immediately */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
// 略去
}
}
return f;
}
 

 

2) 分析

a) HystrixComrnand执行方式

execute():同步执行,从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。

queue():异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。

value = command.execute();
Future<K> fValue = command.queue();
 

b) HystrixObservableCommand执行方式

observe():返回Observable对象,它代表了操作的多个结果,它一个Hot Observable。

toObservable():同样会返回Observable对象,也代表了操作的多个结果,但它返回的是一个Cold Observable。

Observable<K> ohValue = command.observe(); //hot observable
Observable<K> ocValue = command.toObservable(); //cold observable
 

不论事件源是否有订阅者,Hot Observable都会在创建后对事件进行发布,所以对于Hot Observable的“订阅者”都有可能是从“事件源”的中途开始的,并可能只是看到了整个操作的局部过程。而Cold Observable在没有订阅者的时候并不会发布事件,而是进行等待,直到有订阅者之后才发布事件,所以对于Cold Observable的订阅者,它可以保证从一开始看到整个操作的全部过程。

3 从缓存中取结果?

1) 分析

当前Command是否启用缓存功能(即hystrix.command.default.requestCache.enabled是否为true),启用缓存,并且缓存命中时,立即返回;当返回数据后丢入缓存中去。

2) 代码

AbstractCommand#toObservable中

public Observable<R> toObservable() {
//省略
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
final AbstractCommand<R> _cmd = this;
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
//省略
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, this);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
//省略
}
 

4 断路器是否打开?

如果断路器是打开的,那么进入fallback处理流程;如果断路器是关闭的,那么进入下一步。

AbstractCommand#applyHystrixSemantics中

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
//省略
} else {
return handleShortCircuitViaFallback();
}
}
 

5 是否有资源可用?

如果此Command相关的线程池的请求队列或信号量已满,那么进入fallback处理流程,否则进入下一步。以下是信号量检查相关的代码:

AbstractCommand#applyHystrixSemantics中

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet (false, true)) {
executionSemaphore.release();
}
}
}; 
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent (HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult. setInvocationStartTime (System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
 

6 发送请求

1) 说明

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdywy.html