当HystrixFeign类存在时,FeignAutoConfiguration定义的HystrixTargeter作为Targeter存在。同时FeignClientsConfiguration定义的HystrixFeign.builder作为Feign.Builder。
# spring-cloud-netflix-1.3.6.RELEASE.jar!/org.springframework.cloud.netflix.feign.HystrixTargeter
@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
Target.HardCodedTarget<T> target) {
if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
return feign.target(target);
}
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
SetterFactory setterFactory = getOptional(factory.getName(), context,
SetterFactory.class);
if (setterFactory != null) { // 1. 默认SetterFactory未定义
builder.setterFactory(setterFactory);
}
Class<?> fallback = factory.getFallback();
if (fallback != void.class) { // 2. 默认@FeignClient的fallback属性为void.class
return targetWithFallback(factory.getName(), context, target, builder, fallback);
}
Class<?> fallbackFactory = factory.getFallbackFactory();
if (fallbackFactory != void.class) { // 3. 默认@FeignClient的fallbackFactory属性为void.class
return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory);
}
return feign.target(target); // 4. 如果未设置fallback和fallbackFactory,与普通feign无区别
}
回到Hystrix的使用。一般需要和@SpringBootApplication一起添加@EnableHystrix注解。此注解上有@EnableCircuitBreaker注解,通过@Import形式添加了EnableCircuitBreakerImportSelector配置类。
EnableCircuitBreakerImportSelector继承SpringFactoryImportSelector,会读取“META-INF/spring.factories”文件,将“org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker”为key的属性当作配置类。
在spring-cloud-netflix-core下的spring.factories是以HystrixCircuitBreakerConfiguration作为配置类,该类定义了HystrixCommandAspect。
HystrixCommandAspect对于有@HystrixCommand或@HystrixCollapser注解的方法做环绕增强,代码如下:
# hystrix-javanica-1.5.12.jar!/com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint); // 1. 通过连接点,反射获取方法信息,如果是桥接方法,使用ASM获取真实方法
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); //2. 根据方法注解选择不同的工厂类,@HystrixCommand对应的是CommandMetaHolderFactory
MetaHolder metaHolder = metaHolderFactory.create(joinPoint); // 3. 使用MetaHolderFactory(CommandMetaHolderFactory)创建MetaHolder
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); // 4. 如果是@HystrixCollapser注解则创建CommandCollapser;当返回类型是Observable、Single、Completable时创建GenericObservableCommand,否则是GenericCommand
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) { // 5. 非Observable时执行
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder); // 6. rxjava方式执行
}
} catch (HystrixBadRequestException e) {
throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
以传统同步调用为例(非Observable):
# hystrix-javanica-1.5.12.jar!/com.netflix.hystrix.contrib.javanica.command.CommandExecutor
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
case SYNCHRONOUS: { // 1. 调用HystrixCommand执行
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: { // 2. Future返回类型
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
HystrixCommand主要是将自身转成Observable对象,然后包装成BlockingObservable对象,再转成Future对象(即toObservable().toBlocking().toFuture().get())。在转成Future对象时执行订阅。
根据rxjava的规则,在Observable创建事件流,而Observer/Subscriber处理事件流。代码如下:
# hystrix-core-1.5.12.jar!/com.netflix.hystrix.AbstractCommand
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
//doOnCompleted handler already did all of the SUCCESS work
//doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
final Action0 terminateCommandCleanup = new Action0() {
@Override
public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
handleCommandEnd(true); //user code did run
}
}
};
//mark the command as CANCELLED and store the latency (in addition to standard cleanup)
final Action0 unsubscribeCommandCleanup = new Action0() {
@Override
public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(true); //user code did run
}
}
};
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
@Override
public R call(R r) {
R afterFirstApplication = r;
try {
afterFirstApplication = executionHook.onComplete(_cmd, r);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
}
try {
return executionHook.onEmit(_cmd, afterFirstApplication);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
return afterFirstApplication;
}
}
};
final Action0 fireOnCompletedHook = new Action0() {
@Override
public void call() {
try {
executionHook.onSuccess(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
}
}
};
return Observable.defer(new Func0<Observable<R>>() { // 1. 此事件在被订阅时执行call
@Override
public Observable<R> call() {
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { // 2. 将状态由初始的“未开始”转为“观察链已创建”
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics) // 3. 将applyHystrixSemantics这个Observerable对象也作为在被订阅时执行call方法,并由以wrapWithAllOnNextHooks为mapper的MapSubscriber来订阅
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache // 4. 构造Observable结束运行、取消订阅、执行完成的对应操作
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) { // 0.1 断路器关闭、初始化、半开且超过窗口时间时允许执行
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) { // 0.2 尝试获取执行的信号量(非信号量隔离,使用TryableSemaphoreNoOp.DEFAULT,总是返回true)
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd) // 0.3 执行命令
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback(); // 0.4 未获取到信号量时抛出异常
}
} else {
return handleShortCircuitViaFallback(); // 0.5 处理短路场景
}
}
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) { // 0.3.1 HystrixCommand默认true
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess(); // 0.3.2 将断路状态由半开变为关闭,清除断路时间(变为-1)
}
}
};
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess(); // 0.3.3 将断路状态由关闭变为半开,设置断路为当前时间
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) { // 0.3.4 线程池满拒绝执行的处理
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) { // 0.3.5 超时处理
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) { // 0.3.6 错误请求处理
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);// 0.3.7 根据信号量隔离或线程隔离来执行(线程隔离放到线程池,此时将GenericCommand.run方法作为Observable,反射调用方法,如果出现异常被AbstractCommand捕获,交给fallback处理)
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
private Observable<R> handleShortCircuitViaFallback() {
// record that we are returning a short-circuited fallback
eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
// short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
executionResult = executionResult.setExecutionException(shortCircuitException);
try {
return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException);
} catch (Exception e) {
return Observable.error(e);
}
}
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
// record the executionResult
// do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
executionResult = executionResult.addEvent((int) latency, eventType);
if (isUnrecoverable(originalException)) { // 0.5.1 栈溢出、虚拟机错误、线程停止、链接错误为不可恢复错误,短路和信号量拒绝都是RuntimeException
logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
/* executionHook for all errors */
Exception e = wrapWithOnErrorHook(failureType, originalException);
return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
} else {
if (isRecoverableError(originalException)) { // 0.5.2 cause是否为可恢复异常
logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
}
if (properties.fallbackEnabled().get()) {
/* fallback behavior is permitted so attempt */
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(requestContext);
}
};
final Action1<R> markFallbackEmit = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
}
}
};
final Action0 markFallbackCompleted = new Action0() {
@Override
public void call() {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);
}
};
final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
/* executionHook for all errors */
Exception e = wrapWithOnErrorHook(failureType, originalException);
Exception fe = getExceptionFromThrowable(t);
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
Exception toEmit;
if (fe instanceof UnsupportedOperationException) {
logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
} else {
logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);
}
// NOTE: we're suppressing fallback exception here
if (shouldNotBeWrapped(originalException)) {
return Observable.error(e);
}
return Observable.error(toEmit);
}
};
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore(); // 0.5.3 获取或创建信号量
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
fallbackSemaphore.release();
}
}
};
Observable<R> fallbackExecutionChain;
// acquire a permit
if (fallbackSemaphore.tryAcquire()) {
try {
if (isFallbackUserDefined()) { // 1.5.4 getFallback方法存在则返回true
executionHook.onFallbackStart(this);
fallbackExecutionChain = getFallbackObservable(); // 0.5.5 将反射调用fallback方法包装成Observable
} else {
//same logic as above without the hook invocation
fallbackExecutionChain = getFallbackObservable();
}
} catch (Throwable ex) {
//If hook or user-fallback throws, then use that as the result of the fallback lookup
fallbackExecutionChain = Observable.error(ex);
}
return fallbackExecutionChain // 0.5.6 设置fallback Observable的处理
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} else {
return handleFallbackRejectionByEmittingError(); // 0.5.7 抛出HystrixRuntimeException
}
} else {
return handleFallbackDisabledByEmittingError(originalException, failureType, message); // 0.5.8 抛出HystrixRuntimeException
}
}
}
大家喜欢rxjava这种风格吗?