【Hystrix技术指南】(7)故障切换的运作流程原理分析(含源码)

背景介绍

目前对于一些非核心操作,如增减库存后保存操作日志发送异步消息时(具体业务流程),一旦出现MQ服务异常时,会导致接口响应超时,因此可以考虑对非核心操作引入服务降级、服务隔离。

Hystrix说明

官方文档

Hystrix是Netflix开源的一个容灾框架,解决当外部依赖故障时拖垮业务系统、甚至引起雪崩的问题。

为什么需要Hystrix?

  • 在大中型分布式系统中,通常系统很多依赖(HTTP,hession,Netty,Dubbo等),在高并发访问下,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接缓慢,资源繁忙,暂时不可用,服务脱机等。
  • *当依赖阻塞时,大多数服务器的线程池就出现阻塞(BLOCK),影响整个线上服务的稳定性,在复杂的分布式架构的应用程序有很多的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险。
例如:一个依赖30SOA服务的系统,每个服务99.9999.990.3换算成时间大约每月有2个小时服务不稳定.随着服务依赖数量的变多,服务不稳定的概率会成指数性提高.解决问题方案:对依赖做隔离。
复制代码

Hystrix设计理念

想要知道如何使用,必须先明白其核心设计理念,Hystrix基于命令模式,通过UML图先直观的认识一下这一设计模式

  • 可见,Command是在 ReceiverInvoker之间添加的中间层, Command实现了对Receiver的封装
  • API既可以是Invoker又可以是reciever,通过继承Hystrix核心类HystrixCommand来封装这些API(例如,远程接口调用,数据库查询之类可能会产生延时的操作)
  • *就可以为API提供弹性保护了。

Hystrix如何解决依赖隔离

  1. Hystrix使用命令模式HystrixCommand(Command)包装依赖调用逻辑,每个命令在单独线程中/信号授权下执行。
  2. 可配置依赖调用超时时间,超时时间一般设为比99.5%平均时间略高即可。当调用超时时,直接返回或执行fallback逻辑。
  3. 为每个依赖提供一个小的线程池(或信号),如果线程池已满调用将被立即拒绝,默认不采用排队,加速失败判定时间。
  4. 依赖调用结果分,成功,失败(抛出异常),超时,线程拒绝,短路。 请求失败(异常,拒绝,超时,短路)时执行fallback(降级)逻辑。
  5. 提供熔断器组件,可以自动运行或手动调用,停止当前依赖一段时间(10秒),熔断器默认错误率阈值为50%,超过将自动运行
  6. 提供近实时依赖的统计和监控

Hystrix流程结构解析

流程说明:

  1. 每次调用构建HystrixCommand或者HystrixObservableCommand对象,把依赖调用封装在run()方法中.
  2. 结果是否有缓存如果没有执行execute()/queue做sync或async调用,对应真正的run()/construct()
  3. 判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入步骤.
  4. 判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.
  5. 使用HystrixObservableCommand.construct()还是HystrixCommand.run(),运行依赖逻辑
  6. 依赖逻辑调用超时,进入步骤8
  7. 判断逻辑是否调用成功
  • 6a 返回成功调用结果
  • 6b 调用出错,进入步骤8.
  1. 计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.
  2. getFallback()降级逻辑. a. 没有实现getFallback的Command将直接抛出异常 b. fallback降级逻辑调用成功直接返回 c. 降级逻辑调用失败抛出异常
  3. 返回执行成功结果

以下四种情况将触发getFallback调用:

  1. run()方法抛出非HystrixBadRequestException异常
  2. run()方法调用超时
  3. 熔断器开启短路调用
  4. 线程池/队列/信号量是否跑满

熔断器:Circuit Breaker

每个熔断器默认维护10个bucket,每秒一个bucket,每个bucket记录成功,失败,超时,拒绝的状态,默认错误超过50%且10秒内超过20个请求进行中断短路。

Hystrix隔离分析

Hystrix隔离方式采用线程/信号的方式,通过隔离限制依赖的并发量和阻塞扩散.

线程隔离

  • 执行依赖代码的线程与请求线程(如:jetty线程)分离,请求线程可以自由控制离开的时间(异步过程)。
  • 通过线程池大小可以控制并发量,当线程池饱和时可以提前拒绝服务,防止依赖问题扩散。
  • *线上建议线程池不要设置过大,否则大量堵塞线程有可能会拖慢服务器。
实际案例:

Netflix公司内部认为线程隔离开销足够小,不会造成重大的成本或性能的影响。Netflix 内部API 每天100亿的HystrixCommand依赖请求使用线程隔,每个应用大约40多个线程池,每个线程池大约5-20个线程。

信号隔离

信号隔离也可以用于限制并发访问,防止阻塞扩散, 与线程隔离最大不同在于执行依赖代码的线程依然是请求线程(该线程需要通过信号申请),如果客户端是可信的且可以快速返回,可以使用信号隔离替换线程隔离,降低开销。

信号量的大小可以动态调整, 线程池大小不可以。

线程隔离与信号隔离区别如下图:

fallback故障切换降级机制

有兴趣的小伙伴可以看看:官方参考文档

源码分析

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

executeCommandAndObserve
private Observable executeCommandAndObserve(final AbstractCommand _cmd) {final Func1> handleFallback = new Func1>() {public Observable call(Throwable t) {circuitBreaker.markNonSuccess();Exception e = getExceptionFromThrowable(t);executionResult = executionResult.setExecutionException(e);if (e instanceof RejectedExecutionException) {return handleThreadPoolRejectionViaFallback(e);} else if (t instanceof HystrixTimeoutException) {return handleTimeoutViaFallback();} else if (t instanceof HystrixBadRequestException) {return handleBadRequestByEmittingError(e);} else {if (e instanceof HystrixBadRequestException) {eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);return Observable.error(e);}return handleFailureViaFallback(e);}}};Observable execution;if (properties.executionTimeoutEnabled().get()) {execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));} else {execution = executeCommandWithSpecifiedIsolation(_cmd);}return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted).onErrorResumeNext(handleFallback).doOnEach(setRequestContext);}
复制代码

使用Observable的onErrorResumeNext,里头调用了handleFallback,handleFallback中区分不同的异常来调用不同的fallback。

  • RejectedExecutionException调用handleThreadPoolRejectionViaFallback
  • HystrixTimeoutException调用handleTimeoutViaFallback
  • *非HystrixBadRequestException的调用handleFailureViaFallback
applyHystrixSemantics
    private Observable applyHystrixSemantics(final AbstractCommand _cmd) {executionHook.onStart(_cmd);if (circuitBreaker.attemptExecution()) {final TryableSemaphore executionSemaphore = getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);final Action0 singleSemaphoreRelease = new Action0() {public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();}}};final Action1 markExceptionThrown = new Action1() {public void call(Throwable t) {eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);}};if (executionSemaphore.tryAcquire()) {try {executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());return executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} catch (RuntimeException e) {return Observable.error(e);}} else {return handleSemaphoreRejectionViaFallback();}} else {return handleShortCircuitViaFallback();}}
复制代码
  • applyHystrixSemantics方法针对executionSemaphore.tryAcquire()没通过的调用
  • handleSemaphoreRejectionViaFallback
  • applyHystrixSemantics方法针对circuitBreaker.attemptExecution()没通过的调用handleShortCircuitViaFallback()
ViaFallback方法
    private Observable handleSemaphoreRejectionViaFallback() {Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");executionResult = executionResult.setExecutionException(semaphoreRejectionException);eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);logger.debug("HystrixCommand Execution Rejection by Semaphore.");return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,"could not acquire a semaphore for execution", semaphoreRejectionException);}private Observable handleShortCircuitViaFallback() {eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");executionResult = executionResult.setExecutionException(shortCircuitException);try {return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,"short-circuited", shortCircuitException);} catch (Exception e) {return Observable.error(e);}}private Observable handleThreadPoolRejectionViaFallback(Exception underlying) {eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);threadPool.markThreadRejection();return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", underlying);}private Observable handleTimeoutViaFallback() {return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());}private Observable handleFailureViaFallback(Exception underlying) {logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);executionResult = executionResult.setException(underlying);return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);}
复制代码
  • handleSemaphoreRejectionViaFallback、handleShortCircuitViaFallback、handleThreadPoolRejectionViaFallback、handleTimeoutViaFallback、handleFailureViaFallback这几个方法调用了getFallbackOrThrowException
  • 其eventType分别是SEMAPHORE_REJECTED、SHORT_CIRCUITED、THREAD_POOL_REJECTED、TIMEOUT、FAILURE
  • AbstractCommand.getFallbackOrThrowException

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

private Observable getFallbackOrThrowException(final AbstractCommand _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();executionResult = executionResult.addEvent((int) latency, eventType);if (isUnrecoverable(originalException)) {logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);Exception e = wrapWithOnErrorHook(failureType, originalException);return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));} else {if (isRecoverableError(originalException)) {logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);}if (properties.fallbackEnabled().get()) {final Action1super R>> setRequestContext = new Action1super R>>() {public void call(Notificationsuper R> rNotification) {setRequestContextIfNeeded(requestContext);}};final Action1 markFallbackEmit = new Action1() {public void call(R r) {if (shouldOutputOnNextEvents()) {executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);}}};final Action0 markFallbackCompleted = new Action0() {public void call() {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency,HystrixEventType.FALLBACK_SUCCESS);}};final Func1> handleFallbackError = new Func1>() {public Observable call(Throwable t) {Exception e = wrapWithOnErrorHook(failureType, originalException);Exception fe = getExceptionFromThrowable(t);long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();Exception toEmit;if (fe instanceof UnsupportedOperationException) {logger.debug("No fallback for HystrixCommand. ", fe);eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);} else {logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);}if (shouldNotBeWrapped(originalException)) {return Observable.error(e);}return Observable.error(toEmit);}};final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);final Action0 singleSemaphoreRelease = new Action0() {public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {fallbackSemaphore.release();}}};Observable fallbackExecutionChain;if (fallbackSemaphore.tryAcquire()) {try {if (isFallbackUserDefined()) {executionHook.onFallbackStart(this);fallbackExecutionChain = getFallbackObservable();} else {fallbackExecutionChain = getFallbackObservable();}} catch (Throwable ex) {fallbackExecutionChain = Observable.error(ex);}return fallbackExecutionChain.doOnEach(setRequestContext).lift(new FallbackHookApplication(_cmd)).lift(new DeprecatedOnFallbackHookApplication(_cmd)).doOnNext(markFallbackEmit).doOnCompleted(markFallbackCompleted).onErrorResumeNext(handleFallbackError).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} else {return handleFallbackRejectionByEmittingError();}} else {return handleFallbackDisabledByEmittingError(originalException, failureType, message);}}}
复制代码
  • fallbackExecutionChain的onErrorResumeNext,调用了handleFallbackError
  • fallbackExecutionChain的doOnCompleted,调用了markFallbackCompleted
  • AbstractCommand.getFallbackSemaphore

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

protected TryableSemaphore getFallbackSemaphore() {if (fallbackSemaphoreOverride == null) {TryableSemaphore _s = fallbackSemaphorePerCircuit.get(commandKey.name());if (_s == null) {fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.fallbackIsolationSemaphoreMaxConcurrentRequests()));return fallbackSemaphorePerCircuit.get(commandKey.name());} else {return _s;}} else {return fallbackSemaphoreOverride;}}
复制代码

针对每个commandKey获取或创建TryableSemaphoreActual

fallback源码分析小结

hystrix的fallback主要分为5种类型:

  • SEMAPHORE_REJECTED对应handleSemaphoreRejectionViaFallback
  • SHORT_CIRCUITED对应handleShortCircuitViaFallback
  • THREAD_POOL_REJECTED对应handleThreadPoolRejectionViaFallback
  • TIMEOUT对应handleTimeoutViaFallback
  • FAILURE对应handleFailureViaFallback
  • 这几个方法最后都调用了getFallbackOrThrowException方法。

分享资源

资源分享
获取以上资源请访问开源项目 点击跳转

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/87783.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学术论文GPT源码解读:从chatpaper、chatwithpaper到gpt_academic

前言 之前7月中旬,我曾在微博上说准备做“20个LLM大型项目的源码解读” 针对这个事,目前的最新情况是 已经做了的:LLaMA、Alpaca、ChatGLM-6B、deepspeedchat、transformer、langchain、langchain-chatglm知识库准备做的:chatpa…

PS/LR2024专用智能磨皮插件Portraiture提高P图效率

Portraiture 4智能磨皮插件支持Photoshop和Lightroom!Portraiture是一款智能磨皮插件,为Photoshop和Lightroom添加一键磨皮美化功能,快速对照片中皮肤、头发、眉毛等部位进行美化,无需手动调整,大大提高P图效率。全新4…

分布式搜索ElasticSearch-ES(一)

一、ElasticSearch介绍 ES是一款非常强大的开源搜索引擎,可以帮我们从海量的数据中快速找到我们需要的内容。 ElasticSearch结合kibana、Logstash、Beats,也就是elastic stack(ELK),被广泛运用在日志数据分析,实时监控等领域。 …

C#应用处理传入参数 - 开源研究系列文章

今天介绍关于C#的程序传入参数的处理例子。 程序的传入参数应用比较普遍,特别是一个随操作系统启动的程序,需要设置程序启动的时候不显示主窗体,而是在后台运行,于是就有了传入参数问题,比如传入/h或者/min等等。所以此…

【MySQL】表的内外连接

目录 一、内连接 二、外连接 1、左外连接 2、右外连接 一、内连接 内连接实际上就是利用where子句对两种表形成的笛卡儿积进行筛选,我们前面学习的查询都是内连接,也是在开发过程中使用的最多的连接查询。 语法: select 字段 from 表1 i…

【Linux】进程间通信之管道

【Linux】进程间通信之管道 进程间通信进程间通信目的进程间通信的方式 管道(内核维护的缓冲区)匿名管道(用于父子间进程间通信)简单使用阻塞状态读写特征非阻塞状态读写特征 匿名管道特点命名管道 匿名管道与命名管道的区别 进程…

【electron】electron安装过慢和打包报错:Unable to load file:

文章目录 一、安装过慢问题:二、打包报错:Unable to load file: 一、安装过慢问题: 一直处于安装过程 【解决】 #修改npm的配置文件 npm config edit#添加配置 electron_mirrorhttps://cdn.npm.taobao.org/dist/electron/二、打包报错:Unable to load…

Spring Boot 统一功能处理(拦截器实现用户登录权限的统一校验、统一异常返回、统一数据格式返回)

目录 1. 用户登录权限校验 1.1 最初用户登录权限效验 1.2 Spring AOP 用户统⼀登录验证 1.3 Spring 拦截器 (1)创建自定义拦截器 (2)将自定义拦截器添加到系统配置中,并设置拦截的规则 1.4 练习:登录…

for macOS-21.1.0.3267中文直装版功能介绍及系统配置要求

FL Studio 21简称FL水果软件,全称是:Fruity Loops Studio编曲,由于其Logo长的比较像一款水果因此,在大家更多的是喜欢称他为水果萝卜,FL studio21是目前最新的版本,这是一款可以让你的计算机就像是一个全功能的录音室&…

最强自动化测试框架Playwright(10)- 截图

截图 捕获屏幕截图并将其保存到文件中: page.screenshot(path"screenshot.png")可将页面截图保存为screen.png import osfrom playwright.sync_api import Playwright, expect, sync_playwrightdef run(playwright: Playwright) -> None:browser p…

数学建模(二)线性规划

课程推荐:6 线性规划模型基本原理与编程实现_哔哩哔哩_bilibili 在人们的生产实践中,经常会遇到如何利用现有资源来安排生产,以取得最大经济效益的问题。此类问题构成了运筹学的一个重要分支:数学规划。而线性规划(Linear Program…

android Ndk Jni动态注册方式以及静态注册

目录 一.静态注册方式 二.动态注册方式 三.源代码 一.静态注册方式 1.项目名\app\src\main下新建一个jni目录 2.在jni目录下,再新建一个Android.mk文件 写入以下配置 LOCAL_PATH := $(call my-dir)//获取当前Android.mk所在目录 inclu

【Redis】Spring/SpringBoot 操作 Redis Java客户端

目录 操作 Redis Java客户端SpringBoot 操作Redis 步骤 操作 Redis Java客户端 1.Jedis 2.Lettuce(主流) <-Spring Data Redis SpringBoot 操作Redis 步骤 1.添加Redis 驱动依赖 2.设置Redis 连接信息 spring.redis.database0 spring.redis.port6379 spring.redis.host…

【Linux操作系统】深入理解系统调用中的read和write函数

在操作系统中&#xff0c;系统调用是用户程序与操作系统之间进行交互的重要方式。其中&#xff0c;read和write函数是常用的系统调用函数&#xff0c;用于在用户程序和操作系统之间进行数据的读取和写入。本文将深入介绍read和write函数的工作原理、用法以及示例代码&#xff0…

springboot异步任务

在Service类声明一个注解Async作为异步方法的标识 package com.qf.sping09test.service;import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service;Service public class AsyncService {//告诉spring这是一个异步的方法Asyncp…

使用gpt对对话数据进行扩增,对话数据扩增,数据增强

我们知道一个问题可以使用很多方式问&#xff0c;但都可以使用完全一样的回答&#xff0c;基于这个思路&#xff0c;我们可以很快的扩增我们的数据集。思路就是使用chatgpt或者gpt4生成类似问题&#xff0c;如下&#xff1a; 然后我们可以工程化这个过程&#xff0c;从而快速扩…

【Github】SourceTree技巧汇总

sourceTree登录github账户 会跳转到浏览器端 按照Git Flow 初始化仓库分支 克隆远程仓库到本地 推送变更到远程仓库 合并分支 可以看到目前的本地分支&#xff08;main、iOS_JS&#xff09;和远程分支&#xff08;origin/main、origin/HEAD、origin/iOS_JS&#xff09;目前所处…

【问题记录】antd icons报rev属性缺失错误

闲来无事将项目中的antd从v4升级到了v5&#xff0c;之前正常的页面中如有图标&#xff0c;如<PlusOutlined />&#xff0c;总是报以下错误&#xff1a; TS2741: Property rev is missing in type {} but required in type Pick<AntdIconProps, "name" …

如何实现Vue路由的二级菜单

目录 Vue路由&#xff08;一、二级路由&#xff09; 一级路由配置 二级路由配置 Vue中展示二级路由的默认模块/二级路由默认显示 Vue路由&#xff0c;二级路由及跳转 如何用vue实现二级菜单栏 ◼️ 相关参考资料 当朋友们看到这个文章时想必是想要了解vue路由二级菜单相…

React UI组件库

1 流行的开源React UI组件库 1 material-ui(国外) 官网: Material UI: React components based on Material Design github: GitHub - mui/material-ui: MUI Core: Ready-to-use foundational React components, free forever. It includes Material UI, which implements Go…