【智能排班系统】快速消费线程池

文章目录

  • 线程池介绍
    • 线程池核心参数
      • 核心线程数(Core Pool Size)
      • 最大线程数(Maximum Pool Size)
      • 队列(Queue)
      • 线程空闲超时时间(KeepAliveTime)
      • 拒绝策略(RejectedExecutionHandler)
    • 线程池执行流程
  • 快速消费线程池
    • 快速消费线程池组件
      • 相关依赖
      • 快速消费队列
      • 快速消费线程池
      • 获取配置文件的配置
      • 配置线程池Bean到容器中
  • 说明

线程池介绍

线程池作为多线程编程中的重要工具,旨在通过复用已创建的线程来减少线程创建与销毁的开销,提升系统资源利用率和并发性能。要有效地使用线程池,理解和配置其核心参数至关重要。

线程池核心参数

创建一个线程池的代码如下,可以看到构造方法需要传递几个参数,下文会详细展示每个参数的含义:

// 导包
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;// 创建线程池
return new ThreadPoolExecutor(poolConfigProperties.getCoreSize(),poolConfigProperties.getMaxSize(),poolConfigProperties.getKeepAliveTime(),TimeUnit.SECONDS,//队列的最大容量new LinkedBlockingDeque<>(600),//使用默认的工程Executors.defaultThreadFactory(),//使用拒绝新来的拒绝策略new ThreadPoolExecutor.CallerRunsPolicy()
);

核心线程数(Core Pool Size)

核心线程数是指线程池在初始化时创建并保持活动状态的线程数量。即使这些线程当前没有任务执行,它们也不会被回收。核心线程数通常根据系统资源、预期并发负载和任务特性来设定。核心线程在池中长期存在,能够快速响应新提交的任务,减少任务提交后的等待时间。

最大线程数(Maximum Pool Size)

最大线程数限制了线程池能同时容纳的线程总数。当核心线程数无法满足当前任务需求时,线程池会创建额外的线程直至达到最大线程数。超过这个阈值后,线程池将采取拒绝策略处理新提交的任务。合理设置最大线程数,既能防止资源过度消耗导致系统过载,又能确保在高并发场景下有足够的线程处理任务。

队列(Queue)

线程池通常配合任务队列使用,用于暂存待处理的任务。当所有核心线程都处于忙碌状态且未达到最大线程数时,新提交的任务会被放入队列中等待。常见的队列类型包括无界队列(如 LinkedBlockingQueue)、有界队列(如 ArrayBlockingQueue)和优先级队列(如 PriorityBlockingQueue)。队列的选择和容量大小直接影响线程池的阻塞策略和任务调度效率。

线程空闲超时时间(KeepAliveTime)

当线程池中存在超出核心线程数的非核心线程,并且这些线程在一段时间内(即 KeepAliveTime)没有执行任何任务,则会自动终止。这个参数有助于释放闲置资源,避免资源浪费。对于长期存在大量任务的系统,可以适当增大或关闭这个超时时间。

拒绝策略(RejectedExecutionHandler)

当线程池和队列都无法接纳新任务时,需要采用拒绝策略来处理。常见的拒绝策略有:

  • AbortPolicy:默认策略,直接抛出 RejectedExecutionException。
  • CallerRunsPolicy:由提交任务的线程自行执行任务。
  • DiscardPolicy:默默地丢弃任务,不抛出异常也不执行。
  • DiscardOldestPolicy:丢弃队列中最旧的任务,尝试提交新任务。

在这里插入图片描述

线程池执行流程

  • 初始阶段线程池创建并启动核心线程数指定数量的线程。此时,如果有任务提交,直接由这些核心线程执行。

  • 核心线程饱和当所有核心线程都在执行任务且任务队列尚未满时,新提交的任务被放入队列等待

  • 队列满载:若任务提交速率持续高于线程处理速度,队列达到其容量上限。此时,线程池开始创建新的线程(不超过最大线程数),直接执行新提交的任务。

  • 达到最大线程数:若任务增长仍然无法遏制,线程池达到最大线程数。此时,新提交的任务将触发拒绝策略

  • 任务减少与线程收缩:当任务提交速率降低,线程池中的线程开始完成任务并变得空闲。对于非核心线程,若在 KeepAliveTime 时间内未获得新任务,将被终止。系统逐渐回归到更低的线程数,直至仅保留核心线程

在任务量增长的过程中,线程池通过动态调整线程数量和利用任务队列,既保证了系统的响应能力,又防止了资源过度消耗。

快速消费线程池

快速消费线程池通过对上述线程池进行改造,当核心线程饱和时,再提交的任务不是先加入到队列中,而是直接创建非核心线程来执行新提交任务。快速消费线程池可以加快任务的执行,减少任务的堆积。

快速消费线程池组件

在这里插入图片描述

相关依赖

<dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><scope>provided</scope></dependency>
</dependencies>

快速消费队列

该类继承自LinkedBlockingQueue,并对其offer方法进行定制,以配合EagerThreadPoolExecutor实现更灵活的任务调度策略。主要目的是在满足特定条件时,促使线程池创建非核心线程以快速处理任务,而非直接将任务放入队列等待处理。

import lombok.Data;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;/*** 快速消费任务队列*/
@Data
public class EagerTaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {private EagerThreadPoolExecutor executor;/*** 构造函数,传入队列容量参数,用于初始化LinkedBlockingQueue。** @param capacity 队列的最大容量*/public EagerTaskQueue(int capacity) {super(capacity);}/*** 重写父类LinkedBlockingQueue的offer方法,实现自定义的任务入队逻辑* 当没有到达最大线程时,返回false,让其创建非核心线程** @param runnable 待添加的任务对象* @return 如果任务成功加入队列或触发线程池创建非核心线程,则返回true;否则返回false*/@Overridepublic boolean offer(Runnable runnable) {// 获取当前线程池的线程数量int currentPoolThreadSize = executor.getPoolSize();// 检查是否有核心线程处于空闲状态(已提交任务数小于当前线程数)if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {// 如果有核心线程正在空闲,将任务加入阻塞队列,由核心线程进行处理任务return super.offer(runnable);}// 检查当前线程池线程数量是否小于最大线程数if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
//            System.out.println("线程池线程数量小于最大线程数,返回 False,线程池会创建非核心线程");// 当前线程池线程数量小于最大线程数,返回false,触发线程池创建非核心线程处理任务return false;}// 如果当前线程池数量大于最大线程数,任务加入阻塞队列,等待线程池中的已有线程处理return super.offer(runnable);}/**** @param runnable      待添加的任务对象* @param timeout       等待加入队列的超时时间* @param timeUnit      超时时间单位* @return              如果任务成功加入队列或触发线程池创建非核心线程,则返回true;否则返回false* @throws InterruptedException 如果在等待过程中线程被中断* @throws RejectedExecutionException 如果线程池已关闭*/public boolean retryOffer(Runnable runnable, long timeout, TimeUnit timeUnit) throws InterruptedException {// 如果线程池已关闭,则抛出RejectedExecutionException异常。if (executor.isShutdown()) {throw new RejectedExecutionException("Executor is shutdown!");}return super.offer(runnable, timeout, timeUnit);}
}

快速消费线程池

该类继承自ThreadPoolExecutor,并对其进行定制,以实现更灵活的任务调度策略。主要特点包括:

  • 使用自定义的EagerTaskQueue作为工作队列,支持根据线程池状态动态调整任务入队逻辑。
  • 维护正在处理的任务数量计数器(submittedTaskCount),以便EagerTaskQueue判断是否有核心线程处于空闲状态。
  • 在execute方法中,处理任务提交失败的情况,尝试将任务重新投递到队列或使用拒绝策略。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** 快速消费线程池*/
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {/*** 使用AtomicInteger记录当前正在处理的任务数量,提供线程安全的计数操作。*/private final AtomicInteger submittedTaskCount = new AtomicInteger(0);/*** 构造函数,接受线程池相关的配置参数,包括核心线程数、最大线程数、线程存活时间、时间单位、工作队列、线程工厂和拒绝策略。* 工作队列类型为自定义的EagerTaskQueue,用于实现特殊的任务入队逻辑。** @param corePoolSize         核心线程数* @param maximumPoolSize      最大线程数* @param keepAliveTime        线程空闲后的存活时间* @param unit                 时间单位* @param workQueue            工作队列,类型为EagerTaskQueue* @param threadFactory        线程工厂,用于创建新线程* @param handler              拒绝策略,当线程池和队列无法接受新任务时的处理方式*/public  EagerThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,EagerTaskQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}/*** 创建一个EagerThreadPoolExecutor实例的便捷方法* 包括创建EagerTaskQueue并设置其与线程池的关联** @param corePoolSize         核心线程数* @param maximumPoolSize      最大线程数* @param keepAliveTime        线程空闲后的存活时间* @param unit                 时间单位* @param queueCapacity        队列容量* @param threadFactory        线程工厂,用于创建新线程* @param handler              拒绝策略,当线程池和队列无法接受新任务时的处理方式* @return                     创建的EagerThreadPoolExecutor实例*/public static EagerThreadPoolExecutor createEagerThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,int queueCapacity,ThreadFactory threadFactory,RejectedExecutionHandler handler) {EagerTaskQueue eagerTaskQueue = new EagerTaskQueue(queueCapacity);EagerThreadPoolExecutor eagerThreadPoolExecutor = new EagerThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, eagerTaskQueue, threadFactory, handler);eagerTaskQueue.setExecutor(eagerThreadPoolExecutor);return eagerThreadPoolExecutor;}/*** 获取当前正在处理的任务数量。** @return 当前正在处理的任务数量*/public int getSubmittedTaskCount() {return submittedTaskCount.get();}/*** 重写父类的afterExecute方法,当任务执行完成后,将正在执行的任务数量减一。* 这是ThreadPoolExecutor提供的钩子方法,用于在任务执行结束后进行清理或其他操作。** @param r       执行完毕的任务* @param t       执行过程中抛出的异常(如果有的话)*/@Overrideprotected void afterExecute(Runnable r, Throwable t) {// 任务执行完成,将正在执行数量-1submittedTaskCount.decrementAndGet();}/*** 重写父类的execute方法,用于提交任务到线程池。* 在提交任务之前,先将正在执行的任务数量加一。若提交失败,根据具体情况尝试重新投递任务或使用拒绝策略。** @param command 待提交的任务* @throws RejectedExecutionException 如果任务无法被接受,且无法重新投递到队列*/@Overridepublic void execute(Runnable command) {
//        System.out.println("使用快速消费线程池执行任务");// 将正在执行任务数量 + 1submittedTaskCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException ex) {// 任务被拒绝,间隔一定时间,将任务重新投递到队列EagerTaskQueue eagerTaskQueue = (EagerTaskQueue) super.getQueue();try {// 将任务重新投递到队列if (!eagerTaskQueue.retryOffer(command, 10, TimeUnit.MILLISECONDS)) {// 队列已满,使用拒绝策略,并减少计数submittedTaskCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.", ex);}} catch (InterruptedException iex) {// 重试失败,将正在执行任务数量 - 1submittedTaskCount.decrementAndGet();throw new RejectedExecutionException(iex);}} catch (Exception ex) {// 执行失败,将正在执行任务数量 - 1submittedTaskCount.decrementAndGet();throw ex;}}
}

获取配置文件的配置

在这里插入图片描述

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;@ConfigurationProperties(prefix = "sss.thread")
@Component//将该配置放到容器中
@Data
public class ThreadPoolConfigProperties {private Integer coreSize;private Integer maxSize;private Integer keepAliveTime;}

配置线程池Bean到容器中

import com.dam.eager.EagerThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Configuration
public class MyThreadConfig {/*** @param poolConfigProperties 如果需要使用到ThreadPoolConfigProperties,一定要使用Component将其加入到容器中* @return*/@Beanpublic ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties poolConfigProperties) {// 普通线程池
//        return new ThreadPoolExecutor(poolConfigProperties.getCoreSize(),
//                poolConfigProperties.getMaxSize(),
//                poolConfigProperties.getKeepAliveTime(),
//                TimeUnit.SECONDS,
//                //队列的最大容量
//                new LinkedBlockingDeque<>(600),
//                //使用默认的工程
//                Executors.defaultThreadFactory(),
//                //使用拒绝新来的拒绝策略
//                new ThreadPoolExecutor.CallerRunsPolicy()
//        );// 快速消费线程池return EagerThreadPoolExecutor.createEagerThreadPoolExecutor(poolConfigProperties.getCoreSize(),poolConfigProperties.getMaxSize(),poolConfigProperties.getKeepAliveTime(),TimeUnit.SECONDS,// 队列的最大容量600,// 使用默认的工程Executors.defaultThreadFactory(),// 使用拒绝新来的拒绝策略new ThreadPoolExecutor.CallerRunsPolicy());}
}

说明

快速线程池的实现参考马哥 12306 的代码,代码仓库为12306,该项目含金量较高,有兴趣的同学可以去学习一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/296749.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Raven:一款功能强大的CICD安全分析工具

关于Raven Raven是一款功能强大的CI/CD安全分析工具&#xff0c;该工具旨在帮助广大研究人员对GitHub Actions CI工作流执行大规模安全扫描&#xff0c;并将发现的数据解析并存储到Neo4j数据库中。 Raven&#xff0c;全称为Risk Analysis and Vulnerability Enumeration for C…

jQuery(一)

文章目录 1. 基本介绍2.原理示意图3.快速入门1.下载jQuery2.创建文件夹&#xff0c;放入jQuery3.引入jQuery4.代码实例 4.jQuery对象与DOM对象转换1.基本介绍2.dom对象转换JQuery对象3.JQuery对象转换dom对象4.jQuery对象获取数据获取value使用val&#xff08;&#xff09;获取…

HCIA-RS基础-VLAN路由

目录 VLAN 路由1. 什么是 VLAN 路由2. VLAN 路由的原理及配置3. VLAN 的缺点和 VLAN Trunking4. 单臂路由配置 总结 VLAN 路由 1. 什么是 VLAN 路由 VLAN 路由是指在虚拟局域网&#xff08;VLAN&#xff09;之间进行路由转发的过程。传统的 VLAN 配置只能在同一个 VLAN 内进行…

【LeetCode热题100】51. N 皇后(回溯)

一.题目要求 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff0c;返回所有不同的 n 皇后问题 的解决方…

程序员沟通之道:TCP与UDP之辩,窥见有效沟通的重要性(day19)

程序员沟通的重要性&#xff1a; 今天被师父骂了一顿&#xff0c;说我不及时回复他&#xff0c;连最起码的有效沟通都做不到怎么当好一个程序员&#xff0c;想想还挺有道理&#xff0c;程序员需要知道用户到底有哪些需求&#xff0c;用户与程序员之间的有效沟通就起到了关键性作…

Spring Boot 整合 OSS 实现文件上传

一、开通 OSS OSS 也就是 Object Storage Service&#xff0c;是阿里云提供的一套对象存储服务&#xff0c;国内的竞品还有七牛云的 Kodo和腾讯云的COS。 第一步&#xff0c;登录阿里云官网&#xff0c;搜索“OSS”关键字&#xff0c;进入 OSS 产品页。 第二步&#xff0c;如果…

[Python学习篇] Python解释器

解释器的作用 Python解释器&#xff08;Interpreter&#xff09;的作用&#xff0c;通俗理解&#xff0c;就是起到一个翻译的作用&#xff0c;把程序员所编写的代码翻译为计算机能读懂执行的代码。简单地说&#xff0c;Python解释器对输入的Python代码进行解释和执行。Python解…

(科研实践篇)大模型相关知识

1.embedding 1.介绍&#xff1a; embedding就是用一个低纬的向量表示一个物品。而这个embedding向量的实质就是使距离相似的向量所对应的物品具有相似的含义&#xff08;参考皮尔逊算法和cos余弦式子&#xff1a;计算相似度&#xff09;简单来说&#xff0c;就是用空间去表示…

【解决方案】荣耀系统Android8.0 system目录Read-only file system

本来以为直接把Charles证书改成系统证书格式&#xff0c;然后通过mt管理器root之后移动到系统证书目录就行了&#xff0c;结果访问baidu仍然显示网络错误&#xff0c;折腾一晚上。安装为用户证书&#xff0c;又与系统证书冲突。 手机型号&#xff1a;荣耀v10 EMUI&#xff1a…

[蓝桥杯练习]通电

kruskal做法(加边) #include <bits/stdc.h> using namespace std; int x[10005],y[10005],z[10005];//存储i点的x与y坐标 int bcj[10005];//并查集 struct Edge{//边 int v1,v2; double w; }edge[2000005]; int cmp(Edge a, Edge b){return a.w < b.w;} int find(i…

flask的使用学习笔记1

跟着b站学的1-06 用户编辑示例_哔哩哔哩_bilibili flask是一个轻量级&#xff0c;短小精悍&#xff0c;扩展性强&#xff0c;可以扩展很多组件&#xff0c;django大而全 编程语言它们的区别&#xff1a; (这些语言都很了解&#xff0c;java和python是高级语言&#xff0c;都…

TCP的十个重要的机制

注&#xff1a;TCP不是只有十个机制 TCP 可靠传输是tcp最为重要的核心&#xff08;初心&#xff09; 可靠传输&#xff0c;并不是发送方把数据能够100%的传输给接收方 而是退而求其次 让发送方发送出去数据之后&#xff0c;能够知道接收方是否收到数据。 一但发现对方没有…

Head First Design Patterns -代理模式

什么是代理模式 代理模式为另一个对象提供替身或者占位符&#xff0c;以便控制客户对对象的访问&#xff0c;管理访问的方式有很多种。例如远程代理、虚拟代理、保护代理等。 远程代理&#xff1a;管理客户和远程对象之间的交互。 虚拟代理&#xff1a;控制访问实例化开销大的对…

upload-labs训练平台

GitHub&#xff1a;GitHub - Tj1ngwe1/upload-labs: 一个帮你总结所有类型的上传漏洞的靶场 把下好的文件夹之间拖入到小皮的WWW目录下就可以之间访问网址使用了 目录 Pass-01(前端JS的绕过) (1)抓包绕过 (2)在前端绕过 Pass-02&#xff08;content-type绕过&#xff09;…

LabVIEW专栏五、网口

该节目标编写一个网口调试VI。 上一章是串口&#xff0c;这章介绍网口的写法。 一、网口硬件 1.1、上位机网口 1.2、网口线 由线缆和水晶头组成&#xff0c;现在一般用5类和超5类的网线 1.3、接线方式 忽略&#xff0c;这里加上这点为了提醒一个硬件和上位机连接&#xf…

【并发编程】CountDownLatch

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;并发编程 ⛺️稳中求进&#xff0c;晒太阳 CountDownLatch 概念 CountDownLatch可以使一个获多个线程等待其他线程各自执行完毕后再执行。 CountDownLatch 定义了一个计数器&#xff0c;…

【多线程】震惊~这是我见过最详细的ReentrantLock的讲解

一.与synchronized相比ReentrantLock具有以下四个特点: 可中断&#xff1a;synchronized只能等待同步代码块执行结束&#xff0c;不可以中断&#xff0c;强行终断会抛出异常, 而reentrantlock可以调用线程的interrupt方法来中断等待&#xff0c;继续执行下面的代码。 在获取锁…

【C++学习】哈希表的底层实现及其在unordered_set与unordered_map中的封装

文章目录 1. unordered系列关联式容器1.1 unordered_map1.2 unordered_set1.3.底层结构 2.哈希2.1哈希概念2.2哈希冲突2.3 哈希函数2.4 哈希冲突解决2.4.1闭散列2.4.1开散列2.5开散列与闭散列比较 3.哈希的模拟实现1. 模板参数列表2. 迭代器的实现3. 增加通过key获取value操作4…

基于 Quartz.NET 可视化任务调度平台 QuartzUI

一、简介 QuartzUI 是基于 Quartz.NET3.0 的定时任务 Web 可视化管理&#xff0c;Docker 打包开箱即用、内置 SQLite 持久化、语言无关、业务代码零污染、支持 RESTful 风格接口、傻瓜式配置、异常请求邮件通知等。 二、部署 QuartzUI 从 2022 年到现在没有提交记录&#xf…

YOLO火灾烟雾检测数据集:20000多张,yolo标注完整

YOLO火灾烟雾检测数据集&#xff1a;一共20859张图像&#xff0c;yolo标注完整&#xff0c;部分图像应用增强 适用于CV项目&#xff0c;毕设&#xff0c;科研&#xff0c;实验等 需要此数据集或其他任何数据集请私信