页面查询多项数据组合的线程池设计 | 京东云技术团队

背景

我们应对并发场景时一般会采用下面方式去预估线程池的线程数量,比如QPS需求是1000,平均每个任务需要执行的时间是t秒,那么我们需要的线程数是t * 1000。

但是在一些情况下,这个t是不好估算的,即便是估算出来了,在实际的线程环境上也需要进行验证和微调。比如在本文所阐述分页查询的数据项组合场景中。

1、数据组合依赖不同的上游接接口, 它们的响应时间参差不齐,甚至差距还非常大。有些接口支持批量查询而另一些则不支持批量查询。有些接口因为性能问题还需要考虑降级和平滑方案。

2、为了提升用户体验,这里的查询设计了动态列,因此每一次访问所需要组合的数据项和数量也是不同的。

因此这里如果需要估算出一个合理的t是不太现实的。

方案

一种可动态调节的策略,根据监控的反馈对线程池进行微调。整体设计分为装配逻辑线程池封装设计。

1、装配逻辑

查询结果,拆分分片(水平拆分),并行装配(垂直拆分),获得装配项列表(动态列), 并行装配每一项。

2、线程池封装

可调节的核心线程数、最大线程数、线程保持时间,队列大小,提交任务重试等待时间,提交任务重试次数。 固定异常拒绝策略。

调节参数:

字段名称说明
corePoolSize核心线程数参考线程池定义
maximumPoolSize最大线程数参考线程池定义
keepAliveTime线程存活时间参考线程池定义
queueSize队列长度参考线程池定义
resubmitSleepMillis提交任务重试等待时间添加任务被拒绝后重试时的等待时间
resubmitTimes提交任务重试次数添加任务被拒绝后重试添加的最大次数
    @Dataprivate static class PoolPolicy {/** 核心线程数 */private Integer corePoolSize;/** 最大线程数 */private Integer maximumPoolSize;/** 线程存活时间 */private Integer keepAliveTime;/** 队列容量 */private Integer queueSize;/** 重试等待时间 */private Long resubmitSleepMillis;/** 重试次数 */private Integer resubmitTimes;}

创建线程池:

线程池的创建考虑了动态的需求,满足根据压测结果进行微调的要求。首先缓存旧的线程池后再创建新的线程,当新的线程池创建成功后再去关闭旧的线程池。保证在这个替换过程中不影响正在执行的业务。线程池使用了中断策略,用户可以及时感知到系统繁忙并保证了系统资源占用的安全。

public void reloadThreadPool(PoolPolicy poolPolicy) {if (poolPolicy == null) {throw new RuntimeException("The thread pool policy cannot be empty.");}if (poolPolicy.getCorePoolSize() == null) {poolPolicy.setCorePoolSize(0);}if (poolPolicy.getMaximumPoolSize() == null) {poolPolicy.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() + 1);}if (poolPolicy.getKeepAliveTime() == null) {poolPolicy.setKeepAliveTime(60);}if (poolPolicy.getQueueSize() == null) {poolPolicy.setQueueSize(Runtime.getRuntime().availableProcessors() + 1);}if (poolPolicy.getResubmitSleepMillis() == null) {poolPolicy.setResubmitSleepMillis(200L);}if (poolPolicy.getResubmitTimes() == null) {poolPolicy.setResubmitTimes(5);}// - 线程池策略没有变化直接返回已有线程池。ExecutorService original = this.executorService;this.executorService = new ThreadPoolExecutor(poolPolicy.getCorePoolSize(),poolPolicy.getMaximumPoolSize(),poolPolicy.getKeepAliveTime(), TimeUnit.SECONDS,new ArrayBlockingQueue<>(poolPolicy.getQueueSize()),new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build(),new ThreadPoolExecutor.AbortPolicy());this.poolPolicy = poolPolicy;if (original != null) {original.shutdownNow();}
}

任务提交:

线程池封装对象中使用的线程池拒绝策略是AbortPolicy,因此在线程数和阻塞队列到达上限后会触发异常。另外在这里为了保证提交的成功率利用重试策略实现了一定程度的延迟处理,具体场景中可以结合业务特点进行适当的调节和配置。

public <T> Future<T> submit(Callable<T> task) {RejectedExecutionException exception = null;Future<T> future = null;for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {try {// - 添加任务future = this.executorService.submit(task);exception = null;break;} catch (RejectedExecutionException e) {exception = e;this.theadSleep(this.poolPolicy.getResubmitSleepMillis());}}if (exception != null) {throw exception;}return future;
}

监控:

1、submit提交的监控

见代码中的「监控点①」,在submit方法中添加监控点,监控key的需要添线程池封装对象的线程名称前缀,用于区分具体的线程池对象。

「监控点①」用于监控添加任务的动作是否正常,以便对线程池对象及策略参数进行微调。

public <T> Future<T> submit(Callable<T> task) {// - 监控点①CallerInfo callerInfo = Profiler.registerInfo(UmpConstant.THREAD_POOL_WAP + threadNamePrefix,UmpConstant.APP_NAME,UmpConstant.UMP_DISABLE_HEART,UmpConstant.UMP_ENABLE_TP);RejectedExecutionException exception = null;Future<T> future = null;for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {try {// - 添加任务future = this.executorService.submit(task);exception = null;break;} catch (RejectedExecutionException e) {exception = e;this.theadSleep(this.poolPolicy.getResubmitSleepMillis());}}if (exception != null) {// - 监控点①Profiler.functionError(callerInfo);throw exception;}// - 监控点①Profiler.registerInfoEnd(callerInfo);return future;
}

2、线程池并行任务

见代码的「监控点②」,分别在添加任务和任务完成后。

「监控点②」实时统计在线程中执行的总任务数量,用于评估线程池的任务的数量的满载水平。

/** 任务并行数量统计 */
private AtomicInteger parallelTaskCount = new AtomicInteger(0);public <T> Future<T> submit(Callable<T> task) {RejectedExecutionException exception = null;Future<T> future = null;for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {try {// - 添加任务future = this.executorService.submit(()-> {T rst = task.call();// - 监控点②log.info("{} - Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.decrementAndGet());return rst;});// - 监控点②log.info("{} + Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.incrementAndGet());exception = null;break;} catch (RejectedExecutionException e) {exception = e;this.theadSleep(this.poolPolicy.getResubmitSleepMillis());}}if (exception != null) {throw exception;}return future;
}

3、调节

线程池封装对象策略的调节时机

1)上线前基于流量预估的压测阶段;

2)上线后跟进监控数据和线程池中任务的满载水平进行人工微调,也可以通过JOB在指定的时间自动调整;

3)大促前依据往期大促峰值来调高相关参数。

线程池封装对象策略的调节经验

1)访问时长要求较低时,我们可以考虑调小线程数和阻塞队列,适当调大提交任务重试等待时间和次数,以便降低资源占用。

2)访问时长要求较高时,就需要调大线程数并保证相对较小的阻塞队列,调小提交任务的重试等待时间和次数甚至分别调成0和1(即关闭重试提交逻辑)。

作者:京东零售 王文明

来源:京东云开发者社区 转载请注明来源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/164223.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决 sharp: Installation error: unable to verify the first certificate

使用 plasmo 时报错如下&#xff1a; E:\chromeplugins>pnpm create plasmo ../.pnpm-store/v3/tmp/dlx-46852 | 2 ../.pnpm-store/v3/tmp/dlx-46852 | Progress: resolved 2, reused 2, downloaded 0, added 2, done &#x1f7e3; Plasmo v0.83.0 &…

华为---企业WLAN组网基本配置示例---AC+AP组网

ACAP组网所需的物理条件 1、无线AP---收发无线信号&#xff1b; 2、无线控制器(AC)---用来控制管理多个AP&#xff1b; 3、PoE交换机---能给AP实现网络连接和供电的交换机&#xff1b; 4、授权&#xff1a;默认AC管理的AP数量有限&#xff0c;买授权才能管控更多AP。 WLAN创建…

苹果开发者 Xcode发布TestFlight全流程

打包前注意事项 使用Xcode导出安装包之前&#xff0c;必须先确认账户的所有合约是否全部同意&#xff0c;如果有不同意的&#xff0c;在出包的时候会弹出报错 这是什么意思 这意味着您有一些需要在应用商店连接上验证的协议(protocol)/契约(Contract)。解决方案 连接到应用商店…

百度的新想象力在哪?

理解中国大模型&#xff0c;百度是一个窗口。这个窗口的特殊性不仅在于变化本身&#xff0c;而是在于百度本身就是那个窗口。 作者|皮爷 出品|产业家 沿着首钢园北区向西北步行10分钟&#xff0c;就能看到一个高约90米的大跳台&#xff0c;在工业园钢铁痕迹的印衬下&#…

Vue-vue项目Element-UI 表单组件内容要求判断

整体添加判断 <el-formref"ruleFormRef":model"formModel"class"demo-ruleForm"label-position"top"status-icon:rules"rules"><el-form-item label"姓名" prop"applyUsers" class"form-…

[云原生1.]Docker数据管理与Cgroups资源控制管理

文章目录 1. Docker的数据管理1.1 数据卷1.1.1 示例 1.2 数据卷容器 2. 容器互联3. Cgroups资源控制管理3.1 简介3.2 cgroups的主要功能3.3 cpu时间片的简单介绍3.4 对CPU使用的限制3.4.1 对CPU使用的限制&#xff08;基于单个容器&#xff09;3.4.2 对CPU使用的限制&#xff0…

vue3中computed的用法

一、完整代码 <template><div class"about"><h1>Computed的用法</h1><h3>姓:{{ person.firstName }}</h3><input type"text" v-model"person.firstName"><h3>名:{{ person.lastName }}</h3…

【PXIE301-211】基于PXIE总线的16路并行LVDS数据采集、4路低速、2路隔离RS422数据处理平台

板卡概述 PXIE301-211A是一款基于PXIE总线架构的16路高速LVDS、4路低速LVDS采集、2路隔离RS422数据处理平台&#xff0c;该平台板卡采用Xilinx的高性能Kintex 7系列FPGA XC7K325T作为实时处理器&#xff0c;实现各个接口之间的互联。板载1组64位的DDR3 SDRAM用作数据缓存。板卡…

【UE】纯蓝图实现:在游戏运行时设置关键点,然后让actor沿着关键点移动

前言 在上一篇博客(【UE】两步实现“从UI中拖出Actor放置到场景中”)中我们已经实现了如何从UI拖拽生成Actor ,本篇博客在此基础上要实现的是:从UI中拖出车,再从UI中拖出关键点,点击“开始移动”按钮后,车会沿着关键点移动,具体效果如下所示。 效果 步骤 1. 首先创建…

Flink学习之旅:(三)Flink源算子(数据源)

1.Flink数据源 Flink可以从各种数据源获取数据&#xff0c;然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。 数据集合数据文件Socket数据kafka数据自定义Source 2.案例 2.1.从集合中获取数据 创建 FlinkSource_List 类&#xff0c;再创建个 Student 类…

ps或游戏提示d3dcompiler_47.dll缺失怎么修复?常见的修复方法总结

在当今这个信息化的时代&#xff0c;计算机已经成为我们生活和工作中不可或缺的一部分。然而&#xff0c;随着软件的不断更新和升级&#xff0c;一些技术问题也时常困扰着我们。其中&#xff0c;d3dcompiler_47.dll缺失就是一个常见的问题。本文将详细介绍五种修复方案&#xf…

性能超越 Clickhouse | 物联网场景中的毫秒级查询案例

1 物联网应用场景简介 物联网&#xff08;Internet of Things&#xff0c;简称 IoT&#xff09;是指通过各种信息传感、通信和 IT 技术来实时连接、采集、监管海量的传感设备&#xff0c;从而实现对现实世界的精确感知和快速响应&#xff0c;继而实现自动化、智能化管理。在查…

Visual Studio2019 与 MySQL连接 版本关系

Refer: VS 连接MySQL | mysql-for-visualstudio 的安装-CSDN博客 【精选】用VS2019&#xff08;C#&#xff09;连接MYSQL(从0入门&#xff0c;手把手教学&#xff09;_mysql-for-visualstudio-1.2.9.msi_Flying___rabbit的博客-CSDN博客 一、工具&#xff1a;VS2019需要连接M…

gin框架39--重构 BasicAuth 中间件

gin框架39--重构 BasicAuth 中间件 介绍gin BasicAuth 解析自定义newAuth实现基础认证注意事项说明 介绍 每当我们打开一个网址的时候&#xff0c;会自动弹出一个认证界面&#xff0c;要求我们输入用户名和密码&#xff0c;这种BasicAuth是最基础、最常见的认证方式&#xff0…

RabbitMQ整理

MQ(Message Queue)&#xff1a;是队列&#xff0c;也是跨进程的通信机制&#xff0c;用于上下游传递信息 FIFO(First In First Out)&#xff1a;先进先出 RabbitMQ访问&#xff1a;http://127.0.0.1:15672/ 默认账号密码&#xff1a;guest 优势&#xff1a;流量削峰&#x…

CRC16计算FC(博途SCL语言)

CRC8的计算FC,相关链接请查看下面文章链接: 博途SCL CRC8 计算FC(计算法)_博途怎么计算crc_RXXW_Dor的博客-CSDN博客关于CRC8的计算网上有很多资料和C代码,这里不在叙述,这里主要记录西门子的博途SCL完成CRC8的计算过程, CRC校验算法,说白了,就是把需要校验的数据与多项式…

CCF CSP认证 历年题目自练Day34

题目一 试题编号&#xff1a; 202303-1 试题名称&#xff1a; 田地丈量 时间限制&#xff1a; 1.0s 内存限制&#xff1a; 512.0MB 问题描述&#xff1a; 问题描述 西西艾弗岛上散落着 n 块田地。每块田地可视为平面直角坐标系下的一块矩形区域&#xff0c;由左下角坐标 (x1,…

xml schema中的all元素

说明 xml schema中的all元素表示其中的子元素可以按照任何顺序出现&#xff0c;每个元素可以出现0次或者1次。 https://www.w3.org/TR/xmlschema-1/#element-all maxOccurs的默认值是1&#xff0c;minOccurs 的默认值是1。 举例 <element name"TradePriceRequest&…

丈母娘说:有编制的不如搞编程的

10月17日百度世界大会召开&#xff0c;据说文心大模型又牛X了&#xff0c;综合水平相比GPT4毫不逊色&#xff0c;真是个让人激动的消息&#xff0c;国产大模型的进展可以说是日新月异&#xff0c;我这耳朵边一直响彻四个字&#xff1a;遥遥领先。 不过今天我关注的重点不是什么…

ArcGIS在VUE框架中的构建思想

项目快要上线了&#xff0c;出乎意料的有些空闲时间。想着就把其他公司开发的一期代码里面&#xff0c;把关于地图方面的代码给优化一下。试运行的时候&#xff0c;客户说控制台有很多飘红的报错&#xff0c;他们很在意&#xff0c;虽然很不情愿&#xff0c;但能改的就给改了吧…