【并发编程】手写线程池阻塞队列

       📝个人主页:五敷有你      
 🔥系列专栏:并发编程
⛺️稳重求进,晒太阳

示意图 

步骤1:自定义任务队列

变量定义

  1. 用Deque双端队列来承接任务
  2. 用ReentrantLock 来做锁
  3. 并声明两个条件变量 Condition fullWaitSet emptyWaitSet
  4. 最后定义容量 capcity

方法:

  1. 添加任务
    1. 注意点:
      1. 任务容量慢了 用await
      2. 每个添加都进行一个emptyWaitSet.signalAll 唤醒沉睡的线程
      3. 考虑万一死等的情况,加入时间的判断
  2. 取出任务
    1. 注意点:
      1. 任务空了 用await
      2. 每个任务取出来都进行一个fullWaitSet.signAll来唤醒沉睡的线程
      3. 考虑超时的情况,加入时间的判断
public class MyBlockQueue<T> {//1.任务队列private Deque<T> deque=new ArrayDeque();//2.锁private ReentrantLock lock=new ReentrantLock();//3.生产者条件变量private Condition fullWaitSet=lock.newCondition();//4.消费者条件变量private Condition emptyWaitSet=lock.newCondition();//5.容量private int capcity;public MyBlockQueue(int capcity) {this.capcity = capcity;}//带超时的阻塞获取public T poll(long timeOut, TimeUnit unit){lock.lock();try {//将timeOUt转换成统一转换为nslong nanos = unit.toNanos(timeOut);while (deque.isEmpty()) {try {//返回值=等待时间-经过的时间if(nanos<=0){return null;}nanos= emptyWaitSet.awaitNanos(nanos);}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}//6. 阻塞获取public T take() {lock.lock();try {while (deque.isEmpty()) {try {emptyWaitSet.await();}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}//阻塞添加public void put(T element){lock.lock();try {while (deque.size()==capcity){try {fullWaitSet.await();}catch (Exception e){}}deque.addLast(element);emptyWaitSet.signalAll();} finally {lock.unlock();}}public int size(){lock.lock();try {return deque.size();}finally {lock.unlock();}}}

步骤2:自定义线程池

  1. 定义变量:
    1. 任务队列 taskQueue
    2. 队列的容量
    3. 线程的集合
    4. 核心线程数
    5. 获取任务的超时时间
    6. 时间单位
  2. 方法
    1. 构造方法 初始化一些核心的参数
    2. 执行方法 execute(task) 里面处理任务
      1. 每执行一个任务就放入一个worker中,并开启线程执行 同时放入workers集合中
      2. 当任务数量>核心数量时,就加入到阻塞队列中
  3. 自定义的类worker
    1. 继承Thread 重写Run方法
      1. 执行传递的任务,每次任务执行完毕,不回收,
      2. 去队列中拿任务 当队列也空了之后 workers集合中移除线程,线程停止。
package com.aqiuo.juc;import java.util.HashSet;
import java.util.concurrent.TimeUnit;public class ThreadPool {//任务队列private MyBlockQueue<Runnable> taskQueue;//队列容量int queueCapcity;//线程集合private HashSet<Worker> workers=new HashSet();//线程池的核心线程private int coreSize;//获取任务的超时时间private long timeOut;//时间单位private TimeUnit timeUnit;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;taskQueue=new MyBlockQueue<>(queueCapcity);}public void exectue(Runnable task){//当任务数没有超过coreSize时,直接交给work对象执行//如果任务超过coreSize时,加入任务队列synchronized (workers){if(workers.size()<coreSize){Worker worker=new Worker(task);System.out.println("新增worker");workers.add(worker);worker.start();//任务数超过了核心数}else{System.out.println(task+"加入任务队列");taskQueue.put(task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task){this.task=task;}@Overridepublic void run() {//执行任务//1)当task不为空,执行任务//2)当task执行完毕,再接着从任务队列中获取任务while (task!=null||(task=taskQueue.take())!=null){try {System.out.println("正在执行worker"+this);sleep(10000);task.run();} catch (Exception e) {}finally {task=null;}}//执行完任务后销毁线程synchronized (workers){workers.remove(this);}}}}

测试

开启15个线程测试

public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);for (int i=0;i<15;i++){int j=i;threadPool.exectue(()->{System.out.println(j);});}}

        执行过程中,超过了队列容量之后,就会发生fullWaitSet阻塞。这个阻塞的线程就开始等待,当有队列不满之后,唤醒fullWaitSet阻塞的队列,

        同理,当队列为空,emptyWaitSet小黑屋阻塞,当有任务被放入,EmptyWaitSet唤醒所有的线程。

这就有一个执行完毕之后,线程不会停止,他会一定等待拿去任务,线程阻塞了EmptyWaitSet

改进

获取任务的超时结束

获取任务take的增强 超时

  //带超时的阻塞获取public T poll(long timeOut, TimeUnit unit){lock.lock();try {//将timeOUt转换成统一转换为nslong nanos = unit.toNanos(timeOut);while (deque.isEmpty()) {try {//返回值=等待时间-经过的时间if(nanos<=0){return null;}nanos= emptyWaitSet.awaitNanos(nanos);}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}

修改worker的run函数

      public void run() {//执行任务//1)当task不为空,执行任务//2)当task执行完毕,再接着从任务队列中获取任务
//            while (task!=null||(task=taskQueue.take())!=null){//修改如下while (task!=null||(task=taskQueue.poll(timeOut,timeUnit))!=null){try {System.out.println("正在执行worker"+this);sleep(1000);task.run();} catch (Exception e) {}finally {task=null;}}

正常结束了

放入任务的超时结束offer()

那么有装入任务 的增强 ,就再提供一个超时装入入offer()吧 ,当放入一个满的队列时,超时后返回false不再放入

//带有超时的队列添加
public Boolean offer(T element,long timeOut, TimeUnit unit){lock.lock();long nanos = unit.toNanos(timeOut);try {while (deque.size()==capcity){try {long l = fullWaitSet.awaitNanos(nanos);if(l<=0){return false;}}catch (Exception e){}}deque.addLast(element);emptyWaitSet.signalAll();return true;} finally {lock.unlock();}
}

拒绝策略

函数式接口

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {void reject(MyBlockQueue<T> queue, T task);
}

代码改进

如下部分代码是存入任务的部分

public void exectue(Runnable task){//当任务数没有超过coreSize时,直接交给work对象执行//如果任务超过coreSize时,加入任务队列synchronized (workers){if(workers.size()<coreSize){Worker worker=new Worker(task);System.out.println("新增worker");workers.add(worker);worker.start();//任务数超过了核心数}else{//存入任务//taskQueue.put(task);//当队列满了之后 执行的策略//1) 死等//2)带有超时的等待//3)当调用者放弃任务执行//4)让调用者抛出异常//5)让调用者自己执行任务...//为了增加灵活性,这里不写死,交给调用者//重新写了一个放入任务的方法taskQueue.tryPut(rejectPolicy,task);}}}

阻塞队列里的tryPut

public void tryPut(ThreadPool.RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {//如果队列容量满了,就开始执行拒绝策略if(capcity>= deque.size()){rejectPolicy.reject(this,task);}else{//不满就正常加入到队列中System.out.println(task+"正常加入到队列");deque.addLast(task);}}finally {lock.unlock();}}

//1) 死等

//2)带有超时的等待

//3)当调用者放弃任务执行

//4)让调用者抛出异常

//5)让调用者自己执行任务...

谁调用方法,谁写拒绝策略

为了传入策略,就再构造函数里面加入一个方法的参数传入

//部分代码...
//拒绝策略
RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;taskQueue=new MyBlockQueue<>(queueCapcity);this.rejectPolicy=rejectPolicy;
}

主函数编写拒绝的策略,就lamda表达式会把...

public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{//死等
//            queue.put(task);//超时添加
//            System.out.println(queue.offer(task, 100, TimeUnit.NANOSECONDS));//放弃执行
//            System.out.print("我放弃");//调用者抛出异常
//            throw new RuntimeException("任务执行失败");//调用者执行
//            task.run();});for (int i=0;i<5;i++){int j=i;threadPool.exectue(()->{System.out.println(j);});}}

五种拒绝策略的结果(我不会用slog4j)

1.死等的结果

2.超时拒绝的结果(每个false都是时间到了,每加进去)

3.不作为,调用者放弃任务

4.抛出异常,停止

5.调用者线程执行了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/252935.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【wu-lazy-cloud-network】Java自动化内网穿透

项目介绍 wu-lazy-cloud-network 是一款基于&#xff08;wu-framework-parent&#xff09;孵化出的项目&#xff0c;内部使用Lazy ORM操作数据库&#xff0c;主要功能是网络穿透&#xff0c;对于没有公网IP的服务进行公网IP映射 使用环境JDK17 Spring Boot 3.0.2 功能 1.内网…

办公软件巨头CCED、WPS面临新考验,新款办公软件异军突起

办公软件巨头CCED、WPS的成长经历 众所周知&#xff0c;CCED和WPS在中国办公软件领域树立了两大知名品牌的地位。然而&#xff0c;它们的成功并非一朝一夕的成就&#xff0c;而是历经了长时间的发展与积淀。 在上世纪80年代末至90年代初&#xff0c;CCED作为中国大陆早期的一款…

Unity 接口、抽象类、具体类对象的配合使用案例

文章目录 示例1&#xff1a;接口&#xff08;Interface&#xff09;示例2&#xff1a;抽象类&#xff08;Abstract Class&#xff09;示例3&#xff1a;结合使用接口与抽象类示例4&#xff1a;多接口实现示例5&#xff1a;抽象类与接口结合 在Unity中使用C#编程时&#xff0c;接…

华为OD机试真题C卷-篇3

文章目录 查找一个有向网络的头节点和尾节点幼儿园篮球游戏 查找一个有向网络的头节点和尾节点 在一个有向图中&#xff0c;有向边用两个整数表示&#xff0c;第一个整数表示起始节点&#xff0c;第二个整数表示终止节点&#xff1b;图中只有一个头节点&#xff0c;一个或者多…

一、SSM 整合理解

本章概要 什么是 SSM 整合&#xff1f;SSM 整合核心问题明确 SSM 整合需要几个 IoC 容器&#xff1f;每个 IoC 容器对应哪些类型组件&#xff1f;IoC 容器之间关系和调用方向&#xff1f;具体多少配置类以及对应容器关系&#xff1f;IoC 初始化方式和配置位置&#xff1f; 1…

用甘特图有效管理多个项目进度

当公司或组织同时承担多个项目时,合理规划各项目的时间节点与资源分配对确保高效完成至关重要。采用甘特图可以直观地展示多个项目的时间进程、关键里程碑以及资源分配情况,便于从宏观层面全面把控各项目的动态。 在线甘特图软件 zz-plan.com 提供了非常强大的时间轴规划功能,支…

Xampp中Xdebug的安装使用

工欲善其事&#xff0c;必先利其器 XDebug简介 XDebug 是一个用于 PHP 的调试和性能分析工具。它提供了一系列功能&#xff0c;帮助开发者在开发和调试 PHP 应用程序时更加高效。 以下是 XDebug 的一些主要特性和功能&#xff1a; 调试功能&#xff1a; 断点调试&#xff1a;…

基础面试题整理7之Redis

1.redis持久化RDB、AOF RDB(Redis database) 在当前redis目录下生成一个dump.rdb文件&#xff0c;对redis数据进行备份 常用save、bgsave命令进行数据备份&#xff1a; save命令会阻塞其他redis命令&#xff0c;不会消耗额外的内存&#xff0c;与IO线程同步&#xff1b;bgsav…

MySql索引分类

目录 第一章、按数据结构分类1.1&#xff09;树型数据结构索引1.2&#xff09;Hash数据结构索引1.3&#xff09; 其他数据结构索引 第二章、按物理存储方式分类2.1&#xff09;聚簇索引&#xff08;聚集索引&#xff09;2.2&#xff09;非聚簇索引&#xff08;非聚集索引&#…

Blender教程(基础)-顶点的移动、滑移-16

一、顶点的移动与缩放 ShiftA新建柱体、切换到编辑模式 点模式下&#xff0c;选择一个顶点、选择移动&#xff08;GZ&#xff09;&#xff0c;发现顶点严Z轴移动&#xff0c;如下图所示 GY 按数字键盘7切换视图&#xff0c;选择这个面的所有顶点 按S把面缩放大 Ctrl…

【大模型上下文长度扩展】FlashAttention-2:比1代加速1.29倍、GPU利用率从55%上升到72%

FlashAttention-2 提出背景FlashAttention-2 改进 前向传播和反向传播对比FlashAttention前向传播FlashAttention反向传播FlashAttention-2前向传播FlashAttention-2反向传播FlashAttention-2并行性线程束之间的工作分区 总结FlashAttentionFlashAttention-2 论文&#xff1a;h…

Typora导出html文件图片自动转换成base64

Typora导出html文件图片自动转换成base64 一、出现问题二、解决方案三、编码实现3.1.创建Java项目3.2.代码3.3.打包成Jar包 四、如何使用endl 一、出现问题 typora 导出 html 的时候必须带有原图片&#xff0c;不方便交流学习&#xff0c;文件太多显得冗余&#xff0c;只有将图…

Golang GC 介绍

文章目录 0.前言1.发展史2.并发三色标记清除和混合写屏障2.1 三色标记2.2 并发标记问题2.3 屏障机制Dijkstra 插入写屏障Yuasa 删除写屏障混合写屏障 3.GC 过程4.GC 触发时机5.哪里记录了对象的三色状态&#xff1f;6.如何观察 GC&#xff1f;方式1&#xff1a;GODEBUGgctrace1…

鸿蒙OS导入项目报错不能运行 @ohos\hvigor\bin\hvigor.js‘

在自学HarmonyOS时&#xff0c;想在DevEco Studio导入官方示例代码&#xff1a;待办列表&#xff08;ArkTS&#xff09;报错 C:\Users\woods\Downloads\test01\ToDoListArkTS\node_modules\ohos\hvigor\bin\hvigor.js --mode module -p moduleentrydefault -p productdefault …

开源软件:引领技术创新、商业模式与安全的融合

序 在信息技术日新月异的今天&#xff0c;开源软件以其独特的魅力和影响力&#xff0c;正逐渐成为软件产业的新常态。开源软件的低成本、高度可协作性和透明度等特点&#xff0c;不仅吸引了无数企业和个人用户的青睐&#xff0c;更为软件行业带来了前所未有的繁荣景象。 一、…

【C++】构造函数、初始化列表,析构函数,拷贝构造函数,运算符重载

注&#xff1a;本博客图片来源于学习笔记: 学习笔记https://gitee.com/box-he-he/learning-notes 完整思维导图请前往该博主码云下载。 目录 注&#xff1a;本博客图片来源于学习笔记: 学习笔记https://gitee.com/box-he-he/learning-notes 完整思维导图请前往该博主码云下载…

微信小程序(三十六)事件传参

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.传参步骤 2.传参接收解构步骤 源码&#xff1a; index.wxml <button type"primary" bind:tap"onclick" mark:index"{{0}}" mark:remb"{{1}}" class"But&quo…

我的QQ编程学习群

欢迎大家加入我的QQ编程学习群。 群号:950365002 群里面有许多的大学生大佬&#xff0c;有编程上的疑惑可以随时问&#xff0c;也可以聊一些休闲的东西。 热烈欢迎大家加入&#xff01;&#xff01; 上限:150人。

华视 CVR-100UC 身份证读取 html二次开发模板

python读卡&#xff1a;python读卡 最近小唐应要求要开发一个前端的身份证读卡界面&#xff0c;结果华视CVR-100UC 的读取界面是在是有点&#xff0c;而且怎么调试连官方最基本的启动程序都执行不了。CertReader.ocx 已成功&#xff0c;后面在问询一系列前辈之后&#xff0c;大…

uniapp 使用renderjs引入echarts

效果图&#xff1a; 1.1renderjs引入echarts 组件zmui-echarts.vue&#xff1a; <template><view class"zmui-echarts" :prop"option" :change:prop"echarts.delay"></view> </template><script>export defaul…