这一步是真正调用消费者提供服务的过程。这一步在HystrixObservableCommand#construct()或HystrixCommand#run()中实现,这两个类是抽象类,提供了多种不同的子类实现方式。HystrixCommand#run():返回单一结果或抛出异常。HystrixObservableCommand#construct():返回Observable对象,通过Observable对象可以发送多个结果,或通过onError发送错误通知。
如果run()或construct ()方法的执行时间超过了命令设置的超时阙值,当前处理线程将会抛出一个TimeoutException(如果该命令不在其自身的线程中执行,则会通过单独的计时线程来抛出),此时进入fallback流程。如果当前命令没有被取消或中断,那么它最终会忽略run()或者construct ()方法的返回。
如果命令没有抛出异常并返回了结果,那么Hystrix在记录一些日志并采集监控报告之后将该结果返回。在使用run()的情况下,Hystrix会返回一个Observable,它发射单个结果并产生onCompleted的结束通知;而在使用construct()的情况下,Hystrix会直接返回该方法产生的Observable对象。
2) 代码
HttpClientRibbonCommand#run代码如下所示:
protected ClientHttpResponse run() throws Exception {return forward();
}
protected ClientHttpResponse forward() throws Exception {
final RequestContext context = RequestContext.getCurrentContext();
Long contentLength = null;
String contentLengthHeader = context.getRequest().getHeader("Content-Length");
if (StringUtils.hasText(contentLengthHeader)) {
contentLength = new Long(contentLengthHeader);
}
URI uriInstance = new URI(this.uri);
RibbonApacheHttpRequest request = new RibbonApacheHttpRequest(this.method,
uriInstance, this.retryable, this.headers, this.params,
this.requestEntity, contentLength);
final RibbonApacheHttpResponse response = this.client
.executeWithLoadBalancer(request);
context.set("ribbonResponse", response);
// Explicitly close the HttpResponse if the Hystrix command timed out to
// release the underlying HTTP connection held by the response.
//
if (this.isResponseTimedOut()) {
if (response != null) {
response.close();
}
}
return new RibbonHttpResponse(response);
}
7 Calculate Circuit Health
1) 说明
Hystrix会将成功、失败、拒绝、超时等信息报告给断路器,断路器会根据这些报告的统计数据来决定是否要将断路器打开,进而对某个依赖服务的请求进行“熔断/短路”,直到恢复期结束。若在恢复期结束后,根据统计数据判断如果还是未达到健康指标,就再次“熔断/短路”。
在“断路器是否打开”小节我们看到,通过HystrixCircuitBreakerImpl#allowRequest方法判断是否允许请求,这其实就是根据统计的请求报告决定是否开启断路器。请重点看isOpen()和allowSingleTest()方法。
isOpen()判断当前断路器是否已经打开,它是基于HealthCounts的数据决定断路器状态。
allowSingleTest()是半开断路器,尝试放一部分请求过去,看看是否能正常请求,如果正常,并且超过一定比例后,就可以关闭断路器了。
2) 代码
HystrixCircuitBreakerImpl#allowRequest方法
@Overridepublic boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest();
}
public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) if the circuit is open
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds() .get()) {
// We push the 'circuitOpenedTime' ahead by ' sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet (timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
}
}
return false;
}
@Override
public boolean isOpen() {
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}
// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();
// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties. circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}
if (health.getErrorPercentage() < properties. circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime. set(System.currentTimeMillis());
return true;
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
}
}
}
}
8 fallback流程
fallback流程我们通常也称为:服务降级。
1) 说明
a) 引起服务降级的场景
第4步,当前命令处于“熔断/短路”状态,断路器是打开时。
第5步,当前命令的线程池、请求队列、信号量被占满时。
第6步,HystrixObservableCommand#construct()或HystrixCommand#run()抛出异常的时候。
b) 降级时