Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列

🏷️个人主页:牵着猫散步的鼠鼠 

🏷️系列专栏:Java全栈-专栏

🏷️本系列源码仓库:多线程并发编程学习的多个代码片段(github)

🏷️个人学习笔记,若有缺误,欢迎评论区指正 

目录

前言

等待队列是什么

为什么需要等待队列

实现思路

代码实现

1.新建BlockQueue类

2.任务的添加和获取方法

定义阻塞添加任务方法put

编写堵塞拿取任务方法take

带超时时间的阻塞添加方法offer

带超时时间的阻塞获取方法poll

总结


✨️本系列源码均已上传仓库 1321928757/Concurrent-MulThread-Demo(github.com)✨️

前言

在多线程编程中,线程池是一种非常重要的工具。它可以帮助我们高效地管理线程资源,避免频繁创建和销毁线程带来的性能开销。Java中提供了强大的线程池实现,如 ThreadPoolExecutor ,但有时我们可能希望了解其内部原理,并实现一个简单的线程池来加深对其工作机制的理解,手写线程池也是很多大厂常考的笔试题。

开个新坑-手搓简易线程池。本系列文章将从零开始,一步步手工编码实现一个简单但功能完备的线程池。我们将逐步介绍线程池的各个核心组件,并分析它们的作用和实现思路。通过这个过程,我们可以更好地掌握多线程编程的技巧,并培养编码能力和系统性思维。

本篇文章的任务是带领大家定义任务等待队列,任务等待队列是线程池中一个十分重要的组成部分,在各种生产者-消费者模型场景下十分常见,如下我们先来介绍一下等待队列。

等待队列是什么

等待队列(Task Queue)是一种用于临时存储任务的队列数据结构。在多线程环境下,它常被用作生产者-消费者模型中的"缓冲区",用于平衡任务的产生速度和任务的处理速度之间的差异。

等待队列本质上是一个先进先出(FIFO)的队列,新加入的任务会被存储在队列尾部,而消费者则从队列头部获取任务执行。根据队列的实现方式不同,它可以是无界的或者有界的。无界队列理论上可以存储无限多的任务,而有界队列则有最大容量限制。

我们这里的等待队列底层可以借助JDK提供的双端队列ArrayDeque实现。

为什么需要等待队列

在现实场景中,任务的产生速度往往是不均匀的,而执行任务的线程资源又是有限的。如果没有等待队列,当任务瞬间扩增时,可能会出现以下问题:

  1. 任务无法被及时消费,导致任务丢失或被阻塞。
  2. 需要临时创建大量线程来处理任务,线程的创建和销毁开销巨大,影响系统性能。
  3. 任务的执行顺序无法得到保证,可能导致某些重要任务长期得不到执行。

引入等待队列后,生产者可以先将任务存入队列,而消费者则持续从队列中获取并执行任务。这种"缓冲"机制可以有效应对任务瞬间扩增的情况,并保证任务按先后顺序被逐个消费。

此外,通过设置有界队列,我们还可以限制队列的最大容量,防止任务无限堆积导致内存溢出。当队列已满时,我们可以采取拒绝策略(如直接丢弃、暂存等)来应对新加入的任务。

如下图,任务等待队列其实就是联系任务生成者与任务消费者的一个桥梁,生产者生产消息放入等待队列中,再由消费者拿取消费。

实现思路

为了实现一个功能完备的任务等待队列,我们需要设计一个阻塞队列BlockQueue,它具有以下几个关键特性:

  1. 有界队列 BlockQueue将设置一个固定的容量size,队列中最多只能存储size个任务。这样可以防止任务无限制地堆积,导致内存溢出。当队列满时,新添加的任务将被阻塞,直到队列中有空位。

  2. 线程安全 BlockQueue的操作,包括添加任务put和获取任务take,都需要保证线程安全。我们将使用Java的重入锁ReentrantLock和条件变量Condition来实现线程的正确同步。

  3. 支持超时 在添加或获取任务时,BlockQueue将提供带超时时间的方法offer和poll。这样可以防止任务被无限期阻塞,提高系统的响应性和健壮性。

基于以上思路,BlockQueue的实现将涉及以下几个核心部分:

  1. 使用双端队列Deque作为底层数据结构存储任务
  2. 使用ReentrantLock和两个Condition(emptyCondition和fullCondition)来实现阻塞和唤醒机制
  3. 实现put、offer、take、poll等核心方法,正确控制任务的添加和获取

代码实现

1.新建BlockQueue类

我们这里新建一个名为BlockQueue的类,并声明一些属性与字段,其中ArrayDeque为一个双端队列,可以在队列的两端操作元素,size为定义的队列大小

public class BlockQueue<T> {// 双端队列private Deque<T> deque = new ArrayDeque<>();// 队列的容量private int size;public BlockQueue(int size) {this.size = size;}
}

2.任务的添加和获取方法

接下来我们编写向队列中添加任务与获取任务的方法,但是这里要注意的是,读写操作都是在线程池多线程的环境下进行的,存在线程安全问题,所以我们需要配合锁机制来保证操作的原子性。我们这里用JDK提供的ReentrantLock可重入锁来解决这个问题。

继续新增ReentrantLock字段:

 private ReentrantLock lock = new ReentrantLock();

此外,我们还需要解决任务添加与获取时的等待操作,也就是当队列为空时,消费者需要等待任务产生,当队列满时,生产者需要等待队列中有空位才能存入任务。如下图,当消费者获取到锁时,会尝试获取任务,但发现队列为空,就会阻塞等待。

当生产者生产任务后,肯定不能让消费者干等着,而是去通知消费者有活做啦~如下图:

我们可以看到这里涉及到了生产者与消费者不同线程的通讯,这里我们可以借助Condition来完成消费者线程与生产者线程之间的通讯。

分别定义两个Condition代表队列满情况的等待室与队列空情况的等待室

    // 队列空情况的休息室Condition emptyCondition = lock.newCondition();// 队列满情况的休息室Condition fullCondition = lock.newCondition();
定义阻塞添加任务方法put
// 添加任务 阻塞添加public void put(T task) {lock.lock();try {while (size == deque.size()) {try {fullCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("task 添加成功 ,{}", task);deque.addLast(task);emptyCondition.signal();} finally {lock.unlock();}}

这里添加任务会存在两张情况:

  • 队列满了:调用fullCondition.await()方法挂起当前生产者线程,也就是让当前生产者线程等待。
  • 队列没满:将任务加入队列中,并调用emptyCondition.signal()通知挂起的消费者。
编写堵塞拿取任务方法take
    // 获取任务public T take() {lock.lock();try {while (deque.isEmpty()) {try {emptyCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullCondition.signal();log.debug("获取了任务 {}", t);return t;} finally {lock.unlock();}}

同理,这里拿取任务也会存在两张情况:

  • 队列空的:调用emptyCondition.await()方法挂起当前消费者线程,也就是让当前消费者线程等待。
  • 队列存在元素:任务出栈,并返回出栈的任务元素,然后调用fullCondition.signal()通知挂起的生产者。

以上我们其实我们就完成了一个简单的任务堵塞队列,但是我们会发现,如果这两个方法都是会一直堵塞,显然是不合理的,所以我们这里新增添加和获取任务的超时方法。

带超时时间的阻塞添加方法offer
// 带超时时间阻塞添加public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (deque.size() == size) {try {if (nanos <= 0) {return false;}log.debug("等待加入任务队列 {} ...", task);nanos = fullCondition.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);deque.addLast(task);emptyCondition.signal();return true;} finally {lock.unlock();}}

我们可以利用fullCondition.awaitNanos来实现超时等待,当超过给定参数时间时,就会被自动唤醒,并且将传入时间-等待时间作为返回值,下一次循环nanos <= 0时,就会判定为本次操作超时失败了

带超时时间的阻塞获取方法poll
// 带超时时间阻塞获取public T poll(long timeout, TimeUnit unit) {// 1.上锁lock.lock();try {long nanos = unit.toNanos(timeout); // 转为毫秒// 2.首先检查队列是否存在元素while(deque.isEmpty()){try {// 2.1超时判断,返回值是剩余时间if(nanos <= 0){return null;}// 2.2超时等待log.debug("等待获取任务");nanos = emptyCondition.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}// 3.拿取元素T task = deque.removeFirst();log.info("任务拿取成功:{}", task);// 4.唤醒挂起的生产者fullCondition.signal();return task;} finally {// 释放锁lock.unlock();}}

总结

我们在本节内容中实现线程池中一个重要的组件 - 任务等待队列BlockQueue,编写的对应的代码也已同步到了github仓库中,估计还要两次文章的内容才能完成手写线程池部分的内容,后面主要要完成的就是线程池内部运行的基本逻辑,自定义拒绝策略等内容了,博主也是边学边实践边输出教学文章,如果有什么问题都可以在评论区留言,觉得写得不错的话就多多支持吧~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/286458.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】结构体

个人主页点这里~ 结构体 一、结构体类型的声明1、结构的声明2、结构体变量的创建和初始化3、声明时的特殊情况4、自引用 二、结构体内存对齐1、对齐规则2、存在内存对齐的原因3、修改默认对齐数 三、结构体传参四、结构体实现位段 一、结构体类型的声明 我们在指针终篇中提到过…

从零自制docker-5-【USER Namespace NETWORK Namespace】

文章目录 USER Namespace代码NETWORK Namespace代码块 USER Namespace 即进程运行在一个新的namespace中&#xff0c;且该namespace中的User ID和Group IDA在该namespace内外可以不同&#xff0c;可以实现在namspace的用户是root但是对应到宿主机并不是root Cloneflags增加一…

3款免费甘特图制作工具的比较和选择指南

GanntProject GanttProject https://www.ganttproject.biz/ 是一款项目管理和调度应用&#xff0c;适用于 Windows、macOS 和 Linux。它易于使用&#xff0c;无需任何设置&#xff0c;适用于个人用户和小型团队。该应用提供任务层次结构和依存关系、里程碑、基准行、Gantt 图表…

AI论文速读 | 具有时间动态的路网语义增强表示学习

论文标题&#xff1a; Semantic-Enhanced Representation Learning for Road Networks with Temporal Dynamics 作者&#xff1a; Yile Chen&#xff08;陈亦乐&#xff09; ; Xiucheng Li&#xff08;李修成&#xff09;; Gao Cong&#xff08;丛高&#xff09; ; Zhifeng Ba…

卓健易控zj-v8.0设备智能控费系统

卓健易控zj-v8.0设备智能控费系统 详细可联系&#xff1a;19138173009 在现今医疗技术日新月异、突飞猛进的时代&#xff0c;我院服务患者的实力与日俱增。随着先进辅助检查设备的不断完善和引进&#xff0c;医生们如同得到了得力助手&#xff0c;能够为患者做出更加精确的诊断…

TCP重传机制详解——04FACK

文章目录 TCP重传机制详解——04FACK什么是FACKFACK的发展为什么要引入FACK实战抓包讲解开启FACK场景&#xff0c;且达到dup ACK门限值开启FACK场景&#xff0c;未达到dup ACK门限值 为什么要淘汰FACK总结REF TCP重传机制详解——04FACK 什么是FACK FACK的全称是forward ackn…

JVM(二)——垃圾回收

三、垃圾回收 1、如何判断对象可以回收 1&#xff09;引用计数法 定义&#xff1a; 在对象中添加一个引用计数器&#xff0c;每当有一个地方引用它时&#xff0c;计数器值就加一&#xff1b;当引用失效时&#xff0c;计数器值就减一&#xff1b;任何时刻计数器为零的对象就是…

Java面试篇:Redis使用场景问题(缓存穿透,缓存击穿,缓存雪崩,双写一致性,Redis持久化,数据过期策略,数据淘汰策略)

目录 1.缓存穿透解决方案一:缓存空数据解决方案二&#xff1a;布隆过滤器 2.缓存击穿解决方案一:互斥锁解决方案二:设置当前key逻辑过期 3.缓存雪崩1.给不同的Key的TTL添加随机值2.利用Redis集群提高服务的可用性3.给缓存业务添加降级限流策略4.给业务添加多级缓存 4.双写一致性…

MySQL substr函数使用详解

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

景联文科技上新高质量大模型训练数据!

在过去的一年中&#xff0c;人工智能领域呈现出了风起云涌的态势&#xff0c;其中模型架构、训练数据、多模态技术、超长上下文处理以及智能体发展等方面均取得了突飞猛进的发展。 在3月24日举办的2024全球开发者先锋大会的大模型前沿论坛上&#xff0c;上海人工智能实验室的领…

c语言--内存函数的使用(memcpy、memcmp、memset、memmove)

目录 一、memcpy()1.1声明1.2参数1.3返回值1.4memcpy的使用1.5memcpy模拟使用1.6注意 二、memmove()2.1声明2.2参数2.3返回值2.4使用2.5memmove&#xff08;&#xff09;模拟实现 三、memset3.1声明3.2参数3.3返回值3.4使用 四、memcmp()4.1声明4.2参数4.3返回值4.4使用 五、注…

MySQL-extra常见的额外信息

本文为大家介绍MySQL查看执行计划时&#xff0c;extra常见的额外信息 Using index 表示使用了覆盖索引&#xff0c;即通过索引树可以直接获取数据&#xff0c;不需要回表。 表结构: CREATE TABLE t1 (id int(11) NOT NULL AUTO_INCREMENT,name varchar(255) DEFAULT NULL,ag…

IP SSL证书注册流程

使用IP地址申请SSL证书&#xff0c;需要用公网IP地址申请&#xff0c;申请之前确保直接的IP地址可以开放80或者443端口两者选择1个就好&#xff0c;端口不需要一直开放&#xff0c;只要认证的几分钟内开放就可以了&#xff0c;然后IP地址根目录可以上传txt文件。 IP SSL证书认…

vue3+vite - 报错 import.meta.glob() can only accept string literals.(详细解决方案)

报错说明 在vue3+vite项目中,解决报错: [plugin:vite:import-analysis] import.meta.glob() can only accept string literals. 如果我们报错差不多,就可以完美搞定这个错误。 解决教程 这个错误,是因为

【STM32嵌入式系统设计与开发】——9Timer(定时器中断实验)

这里写目录标题 一、任务描述二、任务实施1、ActiveBeep工程文件夹创建2、函数编辑&#xff08;1&#xff09;主函数编辑&#xff08;2&#xff09;USART1初始化函数(usart1_init())&#xff08;3&#xff09;USART数据发送函数&#xff08; USART1_Send_Data&#xff08;&…

蓝桥杯学习笔记(贪心)

在很久很久以前&#xff0c;有几个部落居住在平原上&#xff0c;依次编号为1到n。第之个部落的人数为 t 有一年发生了灾荒&#xff0c;年轻的政治家小蓝想要说服所有部落一同应对灾荒&#xff0c;他能通过谈判来说服部落进行联台。 每次谈判&#xff0c;小蓝只能邀请两个部落参…

HarborCDN技术分析

一、介绍 简要介绍 ​​Harbor​​ 是由VMware公司开源的企业级的Docker Registry管理项目&#xff0c;它包括权限管理(RBAC)、LDAP、日志审核、管理界面、自我注册、镜像复制和中文支持等功能。Harbor 的所有组件都在 Dcoker 中部署&#xff0c;所以 Harbor 可使用 Docker C…

php反序列化刷题1

[SWPUCTF 2021 新生赛]ez_unserialize 查看源代码想到robots协议 看这个代码比较简单 直接让adminadmin passwdctf就行了 poc <?php class wllm {public $admin;public $passwd; }$p new wllm(); $p->admin "admin"; $p->passwd "ctf"; ec…

Redis中的事件

事件 概述 Redis服务器是一个事件驱动程序:服务器需要处理以下两类事件: 1.文件事件(file event):Redis服务器通过套接字与客户端(或者其他Redis服务器)进行连接&#xff0c;而文件事件就是服务器对套接字操作的抽象。服务器与客户端(或者其他服务器)的通信会产生相应的文件…

java串口接收和发送消息集成Springboot

写在前面&#xff1a;1、jdk我用的1.8.0_31 ,不能用太高的java版本。 2、&#xff08;1&#xff09;将rxtxParallel.dll和rxtxSerial.dll文件放到${JAVA_HOME}&#xff08;jdk目录,不是jre目录&#xff09;\jre\bin目录下 如&#xff1a; C:\Program Files\Java\jdk1.8.0_31\…