HystrixCommand#queue方法如下所示。这是Command的异步执行方法。
public Future<R> queue() {final Observable<R> o = toObservable();
final Future<R> f = o.toBlocking().toFuture();
/* special handling of error states that throw immediately */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
// 略去
}
}
return f;
}
2) 分析
a) HystrixComrnand执行方式
execute():同步执行,从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。
queue():异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。
value = command.execute();Future<K> fValue = command.queue();
b) HystrixObservableCommand执行方式
observe():返回Observable对象,它代表了操作的多个结果,它一个Hot Observable。
toObservable():同样会返回Observable对象,也代表了操作的多个结果,但它返回的是一个Cold Observable。
Observable<K> ohValue = command.observe(); //hot observableObservable<K> ocValue = command.toObservable(); //cold observable
不论事件源是否有订阅者,Hot Observable都会在创建后对事件进行发布,所以对于Hot Observable的“订阅者”都有可能是从“事件源”的中途开始的,并可能只是看到了整个操作的局部过程。而Cold Observable在没有订阅者的时候并不会发布事件,而是进行等待,直到有订阅者之后才发布事件,所以对于Cold Observable的订阅者,它可以保证从一开始看到整个操作的全部过程。
3 从缓存中取结果?
1) 分析
当前Command是否启用缓存功能(即hystrix.command.default.requestCache.enabled是否为true),启用缓存,并且缓存命中时,立即返回;当返回数据后丢入缓存中去。
2) 代码
AbstractCommand#toObservable中
public Observable<R> toObservable() {//省略
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
final AbstractCommand<R> _cmd = this;
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
//省略
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, this);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
//省略
}
4 断路器是否打开?
如果断路器是打开的,那么进入fallback处理流程;如果断路器是关闭的,那么进入下一步。
AbstractCommand#applyHystrixSemantics中
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
//省略
} else {
return handleShortCircuitViaFallback();
}
}
5 是否有资源可用?
如果此Command相关的线程池的请求队列或信号量已满,那么进入fallback处理流程,否则进入下一步。以下是信号量检查相关的代码:
AbstractCommand#applyHystrixSemantics中
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet (false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent (HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult. setInvocationStartTime (System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
6 发送请求
1) 说明