Spring Boot 线程池自定义拒绝策略:解决任务堆积与丢失问题

如何通过自定义线程池提升系统稳定性

背景

在高并发系统中,线程池管理至关重要。默认线程池可能导致:

  1. 资源浪费(创建过多线程导致 OOM)
  2. 任务堆积(队列满后任务被拒绝)
  3. 任务丢失(默认拒绝策略丢弃任务
    为了防止这些问题,我们使用 Spring Boot 自定义线程池,并优化 异常处理 和 拒绝策略。

线程池方案设计

在 ExecutorConfig 类中,我们定义了两个线程池:

  1. myExecutor:用于普通任务,采用CallerRunsPolicy 避免任务丢失。
  2. oneExecutor:用于信号计算任务(单线程模式),具有 自定义异常处理 和 阻塞式拒绝策略。

代码解析

线程池 myExecutor(通用任务池)

@Bean(name = "myExecutor")
public Executor myExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadProperties.getCorePoolSize());executor.setMaxPoolSize(threadProperties.getMaxPoolSize());executor.setQueueCapacity(threadProperties.getQueueCapacity());executor.setThreadNamePrefix("signal-executor-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;
}

设计要点:
CallerRunsPolicy:线程池满了,主线程执行任务,防止丢失但可能影响性能。

线程池 oneExecutor(单线程计算池)

@Bean(name = "oneExecutor")
public Executor oneExecutor() {ThreadFactory threadFactory = new BasicThreadFactory.Builder().uncaughtExceptionHandler(new MyThreadException()).namingPattern("one-thread-%s").build();ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(1);executor.setMaxPoolSize(1);executor.setQueueCapacity(1);executor.setThreadFactory(threadFactory);executor.setThreadGroup(new ThreadGroup("1"));executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());executor.initialize();return executor;
}

设计要点:
单线程池(保证任务顺序执行),如果无须,那就按照当前的服务节点配置来设置参数
自定义异常处理(防止线程因异常崩溃)
自定义拒绝策略(任务队列满时阻塞等待)

自定义异常处理

class MyThreadException implements Thread.UncaughtExceptionHandler {@Overridepublic void uncaughtException(Thread t, Throwable e) {log.error("异常: {},线程: {}", ExceptionUtils.getStackTrace(e), t.getName());}
}

作用:防止线程因未捕获异常直接终止,提升系统稳定性。当然这个是处理线程池中子任务处理业务逻辑的时候发生业务异常的处理方式,除此之外还有其他的解决方案

异常处理
  • afterExecute() 处理异常(可扩展) :用于处理执行过程中抛出的异常
  • uncaughtExceptionHandler 处理未捕获异常(默认 JVM 打印堆栈): 用于处理线程未捕获的异常;
  • RejectedExecutionHandler 处理任务拒绝:处理任务被拒绝的情况。

处理顺序:

  1. 当任务执行时,如果任务抛出异常,它会首先被 afterExecute() 捕获,并且你可以在这里进行进一步的处理。
  2. 如果任务中的异常没有被 afterExecute() 捕获或处理,且是未捕获异常,它会交由 uncaughtExceptionHandler 进行处理。
  3. RejectedExecutionHandler 是处理线程池拒绝接受新任务的情况,这通常和任务执行过程中的异常无关,主要处理线程池饱和时的情况。
    注意:beforeExecute() 在任务开始执行前调用,通常用于准备工作;
    异常处理上,beforeExecute() 不会直接处理任务执行过程中的异常,但可以捕获并处理自己内部的异常;

相关源码分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1️⃣ 线程池当前线程数 < corePoolSize,则尝试新增核心线程执行任务
if (workerCountOf© < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2️⃣ 线程池已满,尝试加入工作队列
if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command); // 任务队列中的任务被拒绝
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 防止线程池为空,确保有线程执行任务
}
// 3️⃣ 线程池满且队列满,尝试新增非核心线程
else if (!addWorker(command, false))
reject(command); // 线程池已满,拒绝任务
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 1️⃣ 执行任务
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // ⚠ 任务执行点
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown); // 2️⃣ 任务执行后的扩展方法
}
task = null;
w.completedTasks++;
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 3️⃣ 任务异常退出,删除该线程
}
}

自定义拒绝策略-重新放回队列中
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {if (!executor.isShutdown()) {log.info("队列已满,阻塞等待...");executor.getQueue().put(r);log.info("任务已加入队列");}} catch (Exception e) {log.error("拒绝策略异常", e);}}
}

作用:
默认拒绝策略丢弃任务,而此策略会阻塞等待,确保任务不丢失。
适用于任务量较大,但不能丢失任务的场景(如消息队列处理)

自定义拒绝策略-主线程执行
    /*** 自定义线程池,防止使用默认线程池导致内存溢出** @param* @return* @author bu.junjie* @date 2021/11/10 10:00*/@Bean(name = "myExecutor")public Executor myExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadProperties.getCorePoolSize());executor.setMaxPoolSize(threadProperties.getMaxPoolSize());executor.setQueueCapacity(threadProperties.getQueueCapacity());executor.setThreadNamePrefix("signal-executor-");// 使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行,阻塞主线程executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}

适用场景

✅ 高并发请求(如 HTTP 任务)
✅ 后台数据处理(如日志分析、批量计算)
✅ 长时间任务(如大文件处理、消息队列消费)

总结

  • 自定义线程池 防止资源浪费,提升吞吐量。
  • 异常处理 避免线程因未捕获异常而终止。
  • 优化拒绝策略 防止任务丢失,提高系统可靠性。

线程池优化是高并发系统的关键,希望本篇博客能帮助你更好地理解和应用线程池! 🚀🚀🚀

完整代码示例

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import javax.annotation.Resource;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;/*** 线程池配置参数** @version 1.0.0* @createTime 2025-11-09 14:01*/
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {@Resourceprivate ThreadProperties threadProperties;/*** 自定义线程池,防止使用默认线程池导致内存溢出** @param* @return* @author bu.junjie* @date 2021/11/10 10:00*/@Bean(name = "myExecutor")public Executor myExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadProperties.getCorePoolSize());executor.setMaxPoolSize(threadProperties.getMaxPoolSize());executor.setQueueCapacity(threadProperties.getQueueCapacity());executor.setThreadNamePrefix("signal-executor-");// 使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行,阻塞主线程executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}/*** 信号计算时的线程池(1号线程池)** @param* @return* @author bu.junjie* @date 2022/1/5 13:01*/@Bean(name = "oneExecutor")public Executor oneExecutor() {ThreadFactory threadFactory = new BasicThreadFactory.Builder().uncaughtExceptionHandler(new MyThreadException()).namingPattern("one-thread-%s").build();ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(1);executor.setMaxPoolSize(1);executor.setThreadFactory(threadFactory);executor.setQueueCapacity(1);executor.setThreadGroup(new ThreadGroup("1"));executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());executor.initialize();return executor;}class MyThreadException implements Thread.UncaughtExceptionHandler {/*** Method invoked when the given thread terminates due to the* given uncaught exception.* <p>Any exception thrown by this method will be ignored by the* Java Virtual Machine.** @param t the thread* @param e the exception*/@Overridepublic void uncaughtException(Thread t, Throwable e) {log.error("MyThreadException is   exception=【{}】,Thread = 【{}】", ExceptionUtils.getStackTrace(e), t.getName());}}/*** 拒绝策略优化** @param* @author bu.junjie* @date 2022/1/8 14:06* @return*/public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {// 核心改造点,由blockingqueue的offer改成put阻塞方法if (!executor.isShutdown()) {long start = System.currentTimeMillis();log.info("当前阻塞队列已满开始请求存放队列束!!!");executor.getQueue().put(r);log.info("存放阻塞队列成功,阻塞时间time = 【{}】", System.currentTimeMillis() - start);}} catch (Exception e) {e.printStackTrace();}}}}

思考

为什么拒绝策略要重新抛出异常?

我们会发现默认的四种拒绝策略在处理完业务逻辑之后还会重新抛出异常,就算你是自定义的拒绝策略也需要重新抛出异常,为什么呢?不抛出会怎么样?

如果不抛出异常,调用方(业务代码)无法感知任务被拒绝,可能导致任务丢失或业务逻辑异常。

场景分析

当线程池队列满了时,会触发 rejectedExecution 方法。如果我们只是记录日志,而不抛出异常:

  • 主线程会继续执行,但任务并未真正执行,业务方无法感知到这个问题。
  • 可能导致数据丢失,尤其是在关键业务(如支付、订单、消息处理)场景中。
重新抛出异常的好处

✅ 保证调用方可以感知任务拒绝,决定是否降级处理、重试或报警。
✅ 防止静默丢失任务,保证业务的可靠性。
✅ 与 Spring 线程池默认行为保持一致,防止意外吞掉异常。

代码示例

❌ 错误示例(未抛出异常,可能导致任务丢失)

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {if (!executor.isShutdown()) {log.warn("队列已满,任务阻塞等待...");executor.getQueue().put(r); // 可能抛出异常log.info("任务已放入队列");}} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 仅恢复中断状态,但未通知调用方}}
}

问题:
调用方不会收到异常,以为任务已经成功执行,但其实可能丢失了。
例如,在支付系统中,如果订单更新任务丢失,可能导致订单状态未更新。

✅ 正确示例(重新抛出异常,保证调用方可感知)

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {if (!executor.isShutdown()) {log.warn("队列已满,阻塞等待...");executor.getQueue().put(r);log.info("任务成功进入队列");return; // 任务成功加入队列后不需要抛异常}} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 恢复线程中断状态throw new RejectedExecutionException("任务提交被中断", e);} catch (Exception e) {log.error("任务拒绝策略发生异常", e);throw new RejectedExecutionException("自定义拒绝策略异常", e);}}
}

改进点:
任务成功放入队列时不会抛异常,避免不必要的错误。
如果 put() 失败,抛出 RejectedExecutionException,让业务方感知。
捕获 InterruptedException 并恢复中断状态,避免影响后续任务。
其实这个原因和为什么需要恢复线程中断一样的逻辑,也是为了让调用方感知到

业务方如何处理异常?

如果 rejectedExecution 抛出 RejectedExecutionException,业务代码可以捕获异常并进行降级,例如:

try {executor.execute(task);
} catch (RejectedExecutionException e) {log.error("线程池已满,任务执行失败,进行降级处理", e);// 业务降级策略,例如:saveToDatabaseForLaterProcessing(task);
}

降级方案:如果线程池拒绝任务,可以存入 数据库、MQ 或 重试队列,避免任务丢失。

结论

🚀 必须重新抛出异常,否则:

  • 任务可能悄悄丢失,业务方无法感知。
  • 可能影响数据一致性(如支付、订单、日志处理)。
  • 业务代码无法主动补救(重试、降级等)。

最佳实践:

  • 成功放入队列 → 不抛异常
  • 任务无法处理 → 抛出 RejectedExecutionException,让调用方感知
    这样可以既保证任务不丢失,又确保调用方有能力处理拒绝任务!🔥

自定义拒绝策略put()方法?

其实默认拒绝策略是offer()方法是非阻塞的,也就是只要队列中的任务只要有,那就去创建子线程,直至触发拒绝策略
✅ 正确示例

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {System.out.println("队列已满,阻塞等待...");executor.getQueue().put(r);  // 阻塞等待队列有空位System.out.println("任务重新加入队列:" + r.toString());} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RejectedExecutionException("任务提交失败,线程被中断", e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/15787.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

KITE提示词框架:引导大语言模型的高效新工具

大语言模型的应用日益广泛。然而&#xff0c;如何确保这些模型生成的内容在AI原生应用中符合预期&#xff0c;仍是一个需要不断探索的问题。以下内容来自于《AI 原生应用开发&#xff1a;提示工程原理与实战》一书&#xff08;京东图书&#xff1a;https://item.jd.com/1013604…

性能优化中的系统架构优化

系统架构优化是性能优化的一个重要方面&#xff0c;它涉及到对整个IT系统或交易链上各个环节的分析与改进。通过系统架构优化&#xff0c;可以提高系统的响应速度、吞吐量&#xff0c;并降低各层之间的耦合度&#xff0c;从而更好地应对市场的变化和需求。业务增长导致的性能问…

【学习笔记】计算机网络(三)

第3章 数据链路层 文章目录 第3章 数据链路层3.1数据链路层的几个共同问题3.1.1 数据链路和帧3.1.2 三个基本功能3.1.3 其他功能 - 滑动窗口机制 3.2 点对点协议PPP(Point-to-Point Protocol)3.2.1 PPP 协议的特点3.2.2 PPP协议的帧格式3.2.3 PPP 协议的工作状态 3.3 使用广播信…

机器学习 - 理解偏差-方差分解

为了避免过拟合&#xff0c;我们经常会在模型的拟合能力和复杂度之间进行权衡。拟合能力强的模型一般复杂度会比较高&#xff0c;容易导致过拟合。相反&#xff0c;如果限制模型的复杂度&#xff0c;降低其拟合能力&#xff0c;又可能会导致欠拟合。因此&#xff0c;如何在模型…

【STM32】ADC

本次实现的是ADC实现数字信号与模拟信号的转化&#xff0c;数字信号时不连续的&#xff0c;模拟信号是连续的。 1.ADC转化的原理 模拟-数字转换技术使用的是逐次逼近法&#xff0c;使用二分比较的方法来确定电压值 当单片机对应的参考电压为3.3v时&#xff0c;0~ 3.3v(模拟信号…

DeepSeek 助力 Vue 开发:打造丝滑的步骤条

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 Deep…

基于Python的人工智能驱动基因组变异算法:设计与应用(下)

3.3.2 数据清洗与预处理 在基因组变异分析中,原始数据往往包含各种噪声和不完整信息,数据清洗与预处理是确保分析结果准确性和可靠性的关键步骤。通过 Python 的相关库和工具,可以有效地去除噪声、填补缺失值、标准化数据等,为后续的分析提供高质量的数据基础。 在基因组…

elasticsearch安装插件analysis-ik分词器(深度研究docker内elasticsearch安装插件的位置)

最近在学习使用elasticsearch&#xff0c;但是在安装插件ik的时候遇到许多问题。 所以在这里开始对elasticsearch做一个深度的研究。 首先提供如下链接&#xff1a; https://github.com/infinilabs/analysis-ik/releases 我们下载elasticsearch-7-17-2的Linux x86_64版本 …

linux部署ollama+deepseek+dify

Ollama 下载源码 curl -L https://ollama.com/download/ollama-linux-amd64.tgz -o ollama-linux-amd64.tgz sudo tar -C /usr -xzf ollama-linux-amd64.tgz启动 export OLLAMA_HOST0.0.0.0:11434 ollama serve访问ip:11434看到即成功 Ollama is running 手动安装deepseek…

力扣 单词拆分

动态规划&#xff0c;字符串截取&#xff0c;可重复用&#xff0c;集合类。 题目 单词可以重复使用&#xff0c;一个单词可用多次&#xff0c;应该是比较灵活的组合形式了&#xff0c;可以想到用dp&#xff0c;遍历完单词后的状态的返回值。而这里的wordDict给出的是list&…

【JVM详解二】常量池

一、常量池概述 JVM的常量池主要有以下几种&#xff1a; class文件常量池运行时常量池字符串常量池基本类型包装类常量池 它们相互之间关系大致如下图所示&#xff1a; 每个 class 的字节码文件中都有一个常量池&#xff0c;里面是编译后即知的该 class 会用到的字面量与符号引…

企业数据集成案例:吉客云销售渠道到MySQL

测试-查询销售渠道信息-dange&#xff1a;吉客云数据集成到MySQL的技术案例分享 在企业的数据管理过程中&#xff0c;如何高效、可靠地实现不同系统之间的数据对接是一个关键问题。本次我们将分享一个具体的技术案例——通过轻易云数据集成平台&#xff0c;将吉客云中的销售渠…

CTFHub-RCE系列wp

目录标题 引言什么是RCE漏洞 eval执行文件包含文件包含php://input读取源代码远程包含 命令注入无过滤过滤cat过滤空格过滤目录分隔符过滤运算符综合过滤练习 引言 题目共有如下类型 什么是RCE漏洞 RCE漏洞&#xff0c;全称是Remote Code Execution漏洞&#xff0c;翻译成中文…

深度学习之神经网络框架搭建及模型优化

神经网络框架搭建及模型优化 目录 神经网络框架搭建及模型优化1 数据及配置1.1 配置1.2 数据1.3 函数导入1.4 数据函数1.5 数据打包 2 神经网络框架搭建2.1 框架确认2.2 函数搭建2.3 框架上传 3 模型优化3.1 函数理解3.2 训练模型和测试模型代码 4 最终代码测试4.1 SGD优化算法…

STM32自学记录(十)

STM32自学记录 文章目录 STM32自学记录前言一、USART杂记二、实验1.学习视频2.复现代码 总结 前言 USART 一、USART杂记 通信接口&#xff1a;通信的目的&#xff1a;将一个设备的数据传送到另一个设备&#xff0c;扩展硬件系统。 通信协议&#xff1a;制定通信的规则&#x…

Linux --- 如何安装Docker命令并且使用docker安装Mysql【一篇内容直接解决】

目录 安装Docker命令 1.卸载原有的Docker&#xff1a; 2.安装docker&#xff1a; 3.启动docker&#xff1a; 4.配置镜像加速&#xff1a; 使用Docker安装Mysql 1.上传文件&#xff1a; 2.创建目录&#xff1a; 3.运行docker命令&#xff1a; 4.测试&#xff1a; 安装…

Linux磁盘空间使用率100%(解决删除文件后还是显示100%)

本文适用于&#xff0c;删除过了对应的数据文件&#xff0c;查看还是显示使用率100%的情况 首先使用df -h命令查看各个扇区所占用的情况 一、先对系统盘下所有文件大小进行统计&#xff0c;是否真的是数据存储以达到了磁盘空间 在对应的扇区路径下使用du -sh * | sort -hr 命…

Python——批量图片转PDF(GUI版本)

目录 专栏导读1、背景介绍2、库的安装3、核心代码4、完整代码总结专栏导读 🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双手 🏳️‍🌈 博客主页:请点击——> 一晌小贪欢的博客主页求关注 👍 该系列文章专栏:请点击——>Python办公自动化专…

IDEA查看项目依赖包及其版本

一.IDEA将现有项目转换为Maven项目 在IntelliJ IDEA中,将现有项目转换为Maven项目是一个常见的需求,可以通过几种不同的方法来实现。Maven是一个强大的构建工具,它可以帮助自动化项目的构建过程,管理依赖关系,以及其他许多方面。 添加Maven支持 如果你的项目还没有pom.xm…

HTML 属性

HTML 属性 HTML(超文本标记语言)是构建网页的基础,而HTML属性则是赋予HTML元素额外功能和样式的关键。本文将详细介绍HTML属性的概念、常用属性及其应用,帮助您更好地理解和使用HTML。 一、HTML属性概述 HTML属性是HTML元素的组成部分,用于描述元素的状态或行为。属性总…