Java多线程实战-从零手搓一个简易线程池(二)线程池与拒绝策略实现

🏷️个人主页:牵着猫散步的鼠鼠 

🏷️系列专栏:Java全栈-专栏

🏷️本系列源码仓库:多线程并发编程学习的多个代码片段(github)

🏷️个人学习笔记,若有缺误,欢迎评论区指正 

目录

前言

1.线程池的设计

1.1.线程池的七大参数

1.2.线程池的基本原理和工作流程

2.线程池对象的实现

2.1.核心属性和构造函数

2.2.execute方法的实现

2.3.Worker线程的实现

3.拒绝策略的设计

3.1.拒绝策略的作用

3.2.拒绝策略接口的定义

3.3.任务等待队列新增 尝试添加任务方法

3.4.完善execute方法

4.功能测试

总结


✨️本系列源码均已上传仓库 1321928757/Concurrent-MulThread-Demo(github.com)✨️ 

前言

上节内容回顾:Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列-CSDN博客

在多线程程序中,线程的创建和销毁是一个频繁且代价高昂的操作。如果每次有新任务到来都创建一个新线程,将会导致系统资源的巨大浪费。为了更高效地利用线程资源,我们需要线程池来统一管理和复用线程。

线程池可以避免频繁创建和销毁线程的开销,提高系统的响应速度。同时,线程池还能够设置线程数量上限,防止无限制创建线程导致资源耗尽。因此,在高并发场景下,线程池是必不可少的重要工具。

在前面的文章中,我们讲解了任务堵塞队列的实现,这篇文章会基于任务堵塞队列实现一个简易的线程池,在后续的文章中,还会继续对本章编写的线程池进行功能扩展和优化。

1.线程池的设计

1.1.线程池的七大参数

在上一章节我们已经实现了workQueue等待队列(原先我们的命名为BlockQueue,本节也跟着官方定义改为WorkQueue),本节内容会继续实现corePoolSize,keepAliveTime,unit,handler等基本参数的功能落地,threadFactory线程工厂maximumPoolSize线程池最大线程数这些内容会在我们后续文章对线程池进行扩展补充时实现。

1.2.线程池的基本原理和工作流程

本章节线程池的基本原理如下:

  1. 新任务到达时,首先判断目前正在运行的线程数是否小于核心线程数。
  2. 如果小于,就创建工作线程去执行任务
  3. 如果大于等于,说明没有空闲的工作线程,我们将任务加入等待队列
  4. 每个工作线程执行完当前线程后都会继续尝试去等待队列中获取任务
  5. 如果工作线程超过我们规定的空闲线程存活时间,就会被回收

ps:JDK官方提供的线程池的线程回收只会回收非核心线程,本章节的实现的线程池是一个简易版,为了方便理解,没有分核心线程和非核心线程,全归类为工作线程,后续文章会继续扩展,加入线程工厂,核心线程和非核心线程等内容~

2.线程池对象的实现

2.1.核心属性和构造函数

创建线程池对象:

我们这里首先创建我们的线程池对象(可先定义线程池接口,如JDK官方的ExecutorService接口)

/*** @author Luckysj @刘仕杰* @description 自定义线程池对象* @create 2024/03/27 10:45:17*/
@Slf4j
public class ThreadPool {}

核心属性字段与构造函数定义:

其中workerSet这个集合用来存放我们正在运行的工作线程,Worker为我们封装的工作线程,后面会实现

    /** 任务等待队列 */private WorkQueue<Runnable> workQueue;/** 正在运行的工作线程集合 */private final Set<Worker> workerSet = new HashSet<>();/** 核心线程数 */private int corePoolSize;/** 最大等待时间(也就是线程的最大空闲时间) */private Long keepAliveTime;/** 等待时间单位 */private TimeUnit timeUnit;public ThreadPool(WorkQueue<Runnable> workQueue, int corePoolSize, Long keepAliveTime, TimeUnit timeUnit) {this.workQueue = workQueue;this.corePoolSize = corePoolSize;this.keepAliveTime = keepAliveTime;this.timeUnit = timeUnit;}

2.2.execute方法的实现

当用户创建了我们的线程池后,可以通过execute(task)来传入待执行的任务,这个方法是线程池比较核心的一个方法。

传入任务后,首先判断当前运行的线程是否小于我们规则的核心线程数,如果小于,那么就创建线程去执行该任务,如果大于,说明没有空闲线程了,我们需要加入任务等待队列中

public void execute(Runnable task){synchronized(workerSet){//1 判断当前运行的工作线程数是否小于核心线程数if(workerSet.size() <  corePoolSize){// 2.1 创建工作线程Worker worker = new Worker(task);// 2.2 加入运行线程集合workerSet.add(worker);// 2.3 运行线程worker.start();}else{// 2.1 尝试将任务加入阻塞队列中等待(put方法会一直堵塞等待,后面会改进)workQueue.put(task);}}}

2.3.Worker线程的实现

为了方便使用,我们封装工作线程Worker:

class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {log.info("工作线程{}开始运行", Thread.currentThread());// 1。首先消费当前任务,消费完再去任务队列取,while循环实现线程复用while(task != null || (task = workQueue.poll(keepAliveTime, timeUnit)) != null){try {task.run();}catch (Exception e){throw new RuntimeException(e);}finally {// 执行完后清除任务task = null;}}// 2.跳出循环,说明取任务超过了最大等待时间,线程歇菜休息吧synchronized (workerSet){workerSet.remove(this);}log.info("线程{}超过最大空闲时间没有获取到任务,已被回收", Thread.currentThread());}}

Worker工作线程开始运行后,会进入while循环,首先消费当前任务,如果自身没有任务,就去等待队列中拿取任务,通过poll方法超时堵塞拿取,当超过keepAliveTime(最大空闲时间)没有拿到线程时,那么就会跳出循环,从工作线程集合中删除当前线程。

如上我们就完成了简易线程池对象的基本设计,但是如果任务很多,任务等待队列被装满了,后续添加任务会一直被堵塞,所以我们要引入拒绝策略,针对队列满了后来的任务进行一个特殊处理。

3.拒绝策略的设计

3.1.拒绝策略的作用

高并发的应用场景中,任务的到达速度可能会暂时超过线程池的处理能力,导致任务队列处于已满状态。这种情况下,如果仍然有新任务到来,线程池就需要采取一些策略来拒绝这些新任务,避免资源耗尽。

拒绝策略定义了线程池在任务队列已满时应当执行的操作,不同的拒绝策略会产生不同的效果。合理地设置拒绝策略,可以保证线程池在高负载情况下的稳定性,防止资源被无限制占用。

3.2.拒绝策略接口的定义

我们通过RejectPolicy接口来定义拒绝策略,它只有一个reject方法:

public interface RejectPolicy<T> {void reject(BlockQueue<T> queue, T task);
}

其中:

  • queue是当前的任务队列
  • task是被拒绝的任务

不同的拒绝策略需要实现该接口,并在reject方法中定义具体的拒绝操作。用户可以在创建线程池时通过传入自定义的拒绝策略实现类,从而实现自定义的任务拒绝处理方案。本次只会编写一个简单的拒绝策略做测试,后续文章会继续扩展,实现几个默认的拒绝策略。

3.3.任务等待队列新增 尝试添加任务方法

拒绝添加任务这个动作是属于任务等待队列这个对象的,所以我们为任务等待队列新增一个方法

// 尝试向队列添加任务,如果队列已满就触发拒绝策略public void tryPut(RejectPolicy<T> rejectPolicy, T task){lock.lock();try {if(deque.size() == size){// 队列满了就触发拒绝策略log.info("拒绝策略触发,当前任务:{}", task);rejectPolicy.reject(this, task);}else{// 队列没满就将任务加入队列log.debug("没有空闲线程,加入任务等待队列等待");deque.addLast(task);emptyCondition.signal();}}finally {lock.unlock();}}

这个方法需要传入当前的拒绝策略和待添加的任务,如果队列已经满了,就会触发拒绝策略

3.4.完善execute方法

在线程池对象的excute方法中,我们通过tryPut来向等待队列中添加任务

public void execute(Runnable task){synchronized(workerSet){//1 判断当前运行的工作线程数是否小于核心线程数if(workerSet.size() <  corePoolSize){// 2.1 创建工作线程Worker worker = new Worker(task);// 2.2 加入运行线程集合workerSet.add(worker);// 2.3 运行线程worker.start();}else{// 2.1 尝试将任务加入阻塞队列中等待,如果加入失败,触发拒绝策略workQueue.tryPut(rejectPolicy, task);}}}

4.功能测试

以上我们就完成了一个极简版的线程池,接下来我们会做一些测试来测试线程池能否正常使用。

定义测试类,传入长度为5的等待队列,核心线程池数为2,最大空闲时间为5S,拒绝策略为直接丢弃(使用了Lamda写法)。

执行四次打印任务,按照预期应该是后两次任务会加入等待队列等待。

@Slf4j
public class MainTest {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(new WorkQueue<>(5), 2, 5L, TimeUnit.SECONDS,(queue, task) -> {// 一直等//queue.put(task);// 调用者线程执行//task.run();// 直接抛出异常// throw new RuntimeException("saa");// 丢弃这个任务log.debug("丢弃这个任务{}", task);});for (int i = 0; i < 4; i++) {threadPool.execute(() -> {System.out.println("执行任务------->当前执行线程为" + Thread.currentThread().toString());try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}});}}
}

执行结果如下,可以看到有两次任务加入到了等待队列中(多线程日志可能有时候存在顺序问题)

我们将执行次数改为10次,那么预期将会有三条任务被拒绝

总结

本章节基于上节的任务等待队列实现了一个简易线程池,实现了任务等待,线程复用与拒绝策略等功能,大体上来说,本次的多线程实战的手搓线程池部分就差不多完结了,后续有时间的话可能还会出一期功能完善篇,可以扩展的功能如下,小伙伴们也能在我架子上自行扩展:

  • 定义线程工厂,通过线程工厂来创建核心线程和非核心线程
  • 线程池会根据最大核心线程数和总线程数的情况来管理心线程和非核心线程
  • 实现几个拒绝策略,并配置默认拒绝策略
  • 线程池生命周期管理,提供start()、shutdown()、shutdownNow()等方法,允许用户主动控制线程池的启停。
  • 添加任务执行回馈机制,当前版本的线程池无法获知已提交任务的执行状态和结果。比如支持Future/FutureTask,允许用户追踪任务状态、获取执行结果、取消任务等。
  • 异常处理策略,当前对任务执行过程中的异常只是简单抛出,缺少统一的异常处理策略,可以考虑提供自定义的异常处理器接口,允许用户实现自己的异常处理逻辑,比如记录日志、任务重试等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/289067.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

文件操作(下)(想要了解如何操作文件,那么看这一片就足够了!)

前言&#xff1a;在文件操作&#xff08;上&#xff09;中&#xff0c;我们讲到了基础的文件操作&#xff0c;包括文件的打开&#xff0c;文件的关闭&#xff0c;以及文件的基础读写&#xff0c;那么除了之前学习的读写之外&#xff0c;还有什么其他的方式对文件进行读写操作吗…

Python提示‘ModuleNotFoundError: No module named ‘numpy.core._multiarray_umath‘

一、问题背景 在学习Python编程使用matplotlib时&#xff0c;总是提示: ModuleNotFoundError: No module named numpy.core._multiarray_umath 问题大致描述如下&#xff1a; D:\WorkSpace\PythonWorkSpace\Python编程-从入门到实践\venv\Scripts\python.exe D:\WorkSpace\Pyt…

Jenkins用户角色权限管理

Jenkins作为一款强大的自动化构建与持续集成工具&#xff0c;用户角色权限管理是其功能体系中不可或缺的一环。有效的权限管理能确保项目的安全稳定&#xff0c;避免敏感信息泄露。 1、安装插件&#xff1a;Role-based Authorization Strategy 系统管理 > 插件管理 > 可…

ES面试题

1、如何同步索引库 同步调用 在完成数据库操作后&#xff0c;直接调用搜索服务提供的接口 异步通知 在完成数据库操作后&#xff0c;发送MQ消息 搜索服务监听MQ&#xff0c;接收到消息后完成数据修改 监听binlog 2、分词器 ik分词器 ik_smart ik_max_word 自定义分词器 以拼…

安静:内向性格的竞争力 - 三余书屋 3ysw.net

精读文稿 这期我们介绍的这本书叫做《安静》&#xff0c;副标题是《内向性格的竞争力》。本书共有267页&#xff0c;我会用大约25分钟的时间为你讲述书中的精髓。内向性格具备什么样的竞争力&#xff1f;内向性格的人在人际交往和日常生活中似乎总是吃亏&#xff0c;因为他们不…

Postman传对象失败解决

文章目录 情景复现解决方案总结 情景复现 postman中调用 debug发现pId传入失败 分析解释&#xff1a; 实体类中存在pId、uid和num字段 controller层将GoodsCar作为请求体传入 解决方案 当时觉得很奇怪&#xff0c;因为uid和num可以被接收&#xff0c;而pId和num的数据类型相…

安卓Activity上滑关闭效果实现

最近在做一个屏保功能&#xff0c;需要支持如图的上滑关闭功能。 因为屏保是可以左右滑动切换的&#xff0c;内部是一个viewpager 做这个效果的时候&#xff0c;关键就是要注意外层拦截触摸事件时&#xff0c;需要有条件的拦截&#xff0c;不能影响到内部viewpager的滑动处理…

学习Fast-LIO系列代码中相关概念理解

目录 一、流形和流形空间&#xff08;姿态&#xff09; 1.1 定义 1.2 为什么要有流形? 1.3 流形要满足什么性质&#xff1f; (1) 拓扑同胚 (2) 可微结构 1.4 欧式空间和流形空间的区别和联系? (1) 区别&#xff1a; (2) 联系&#xff1a; 1.5 将姿态定义在流形上比…

深入解析《企业级数据架构》:HDFS、Yarn、Hive、HBase与Spark的核心应用

写在前面 进入大数据阶段就意味着进入NoSQL阶段&#xff0c;更多的是面向OLAP场景&#xff0c;即数据仓库、BI应用等。 大数据技术的发展并不是偶然的&#xff0c;它的背后是对于成本的考量。集中式数据库或者基于MPP架构的分布数据库往往采用的都是性能稳定但价格较为昂贵的小…

创建VUE项目

设置淘宝源 npm config set registry https://registry.npm.taobao.org 或安装 npm install -g cnpm --registryhttps://registry.npm.taobao.org 创建项目cjhtest 1.vue create cjhtest 1.1 ? Please pick a preset: vue2_vuex_router ([Vue 2] less, babel, router, v…

上位机图像处理和嵌入式模块部署(qmacvisual之ROI设定)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 ROI&#xff0c;全称是region of interest&#xff0c;也就是感兴趣区域。这里面一般分成两种情况&#xff0c;一种是所有的算法都依赖于这个ROI&a…

管理阿里云服务器ECS -- 网站选型和搭建

小云&#xff1a;我已经学会了如何登录云服务器ECS了&#xff0c;但是要如何搭建网站呢&#xff1f; 老王&#xff1a;目前有很多的个人网站系统软件&#xff0c;其中 WordPress 是使用非常广泛的一款&#xff0c;而且也可以把 WordPress 当作一个内容管理系统&#xff08;CMS…

JavaScript 权威指南第七版(GPT 重译)(四)

第九章&#xff1a;类 JavaScript 对象在第六章中有所涉及。该章将每个对象视为一组独特的属性&#xff0c;与其他对象不同。然而&#xff0c;通常有必要定义一种共享某些属性的对象类。类的成员或实例具有自己的属性来保存或定义它们的状态&#xff0c;但它们还具有定义其行为…

A - Environment-Friendly Travel Gym - 102501A

题意&#xff1a;给你一些交通方式和站点&#xff0c;不同的交通方式碳排放不一样&#xff0c;问从起点到终点距离不超过B的路径中最少的碳排放是多少。 思路&#xff1a;二维dijkstra&#xff0c;建图什么的倒不是很难&#xff0c;主要就是对二维dij的理解了&#xff1b; 表示…

HTTPS:原理、使用方法及安全威胁

文章目录 一、HTTPS技术原理1.1 主要技术原理1.2 HTTPS的工作过程1.2.1 握手阶段1.2.2 数据传输阶段 1.3 CA证书的签发流程1.4 HTTPS的安全性 二、HTTPS使用方法三、HTTPS安全威胁四、总结 HTTPS&#xff08;全称&#xff1a;Hyper Text Transfer Protocol over Secure Socket …

如何使用 ArcGIS Pro 自动矢量化水系

对于某些要素颜色统一的地图&#xff0c;比如电子地图&#xff0c;可以通过图像识别技术将其自动矢量化&#xff0c;这里为大家介绍一下 ArcGIS Pro 自动矢量化水系的方法&#xff0c;希望能对你有所帮助。 数据来源 教程所使用的数据是从水经微图中下载的电子地图数据&#…

借鉴LangChain思想使用Java实现大模型Function_Call工具开发及调用功能

关注公众号&#xff0c;回复“工具调用”获取代码。 背景 博主之前研究的是ChatGLM3模型&#xff0c;该模型提供了Openai方式调用工具的代码。但后续转到Qwen1.5模型后&#xff0c;好像不可以直接用Openai接口的方式调用工具了。 然后&#xff0c;博主研究了Qwen-Agent框架&a…

Stable Diffusion XL之核心基础内容

Stable Diffusion XL之核心基础内容 一. Stable Diffusion XL核心基础内容1.1 Stable Diffusion XL的主要优化1.2 SDXL整体架构初识1.3 VAE模型1.VAE基本介绍2. VAE基本模型结构3.VAE的训练 1.4 U-Net模型&#xff08;Base部分&#xff09;1. 十四个基本模块概述2. SDXL_Spatia…

[Linux_IMX6ULL驱动开发]-基础驱动

驱动的含义 如何理解嵌入式的驱动呢&#xff0c;我个人认为&#xff0c;驱动就是嵌入式上层应用操控底层硬件的桥梁。因为上层应用是在用户态&#xff0c;是无法直接操控底层的硬件的。我们需要利用系统调用&#xff08;open、read、write等&#xff09;&#xff0c;进入内核态…

tensorflow安装以及在Anaconda中安装使用

在遥感领域进行深度学习时&#xff0c;通常使用python进行深度学习&#xff0c;会使用到tensorflow的安装&#xff0c;今天小编就给大家介绍如何在Anaconda中安装tensorflow&#xff01; 下载Anaconda Index of /anaconda/archive/ | 清华大学开源软件镜像站 | Tsinghua Open…