Java 自定义线程池实现

自定义线程池

  • 简介
  • 任务图示
  • 阻塞队列 BlockingQueue<T>
    • ReentrantLock
    • 代码
  • 线程池 ThreadPool
    • 工作线程类 Worker
  • 拒绝策略接口
  • 代码测试类 TestThreadPool
    • 为什么需要j = i?(lambad表达式相关)
  • 测试结果
    • 拒绝策略:让调用者自己执行任务
    • 拒绝策略:让调用者放弃任务执行

简介

学习 黑马JUC 并发编程 过程中根据教程完成的线程池案例

任务图示

在这里插入图片描述
线程池Thread Pool保存工作资源,即n个工作线程,通过从任务队列Blocking Queue中获取m个任务执行。

阻塞队列 BlockingQueue

注意ReentrantLock类的使用

包括任务队列、生产者消费者锁、容量capcity、阻塞获取、带时阻塞获取、阻塞添加、带时阻塞添加、带拒绝策略的添加

Blocking Queue对获取任务或者添加任务的操作进行阻塞管理,分别设置生产者锁(任务队列满则阻塞生产者,停止添加任务)、消费者锁(任务队列空则阻塞消费者,停止获取任务)。

ReentrantLock

基本语法:

reentrantLock.lock();
try{// 临界区
} finally {// 释放锁reentrantLock.unlock();
}

创建条件变量,以及 阻塞 和 唤醒 的基本使用
在这里插入图片描述

代码

@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {// 1. 任务队列private Deque<T> queue = new ArrayDeque<>();// 2. 锁private ReentrantLock lock = new ReentrantLock();// 3. 生产者条件变量private Condition fullWaitSet = lock.newCondition();// 4. 消费者条件变量private Condition emptyWaitSet = lock.newCondition();// 5. 容量private int capcity;public BlockingQueue(int capcity) {this.capcity = capcity;}// 带超时阻塞获取public T poll(long timeout, TimeUnit unit) {lock.lock();try {//将timeout统一转换为 纳秒long nanos = unit.toNanos(timeout);//如果任务队列为空,在消费者阻塞队列中,超时时间内循环阻塞while(queue.isEmpty()){try{if (nanos <= 0){return null;}//超时时间内进行 空队列阻塞nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}T t = queue.removeFirst();//唤醒 生产者阻塞队列 中的线程fullWaitSet.signal();return t;}finally{lock.unlock();}}// 阻塞获取public T take() {lock.lock();try{//循环阻塞while(queue.isEmpty()){try {emptyWaitSet.await();}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = queue.removeFirst();fullWaitSet.signal();return t;}finally{lock.unlock();}}// 阻塞添加public void put(T task) {lock.lock();try{while(queue.size() >= capcity){try{log.debug("等待加入任务队列————任务:{} ", task);fullWaitSet.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("加入任务队列————任务: {}", task);queue.addLast(task);emptyWaitSet.signal();}finally{lock.unlock();}}// 带超时时间阻塞添加public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try{long nanos = timeUnit.toNanos(timeout);while(queue.size() >= capcity){if(nanos <= 0) {return false;}try {log.debug("等待加入任务队列————任务:{} ", task);//awaitNanos返回剩余等待时间nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug("加入任务队列————任务: {}", task);queue.addLast(task);emptyWaitSet.signal();return true;}finally{lock.unlock();}}public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try{//判断队列是否满if(queue.size() >= capcity){//使用拒绝策略rejectPolicy.reject(this, task);} else{log.debug("加入任务队列————任务: {}", task);queue.addLast(task);emptyWaitSet.signal();}}finally{lock.unlock();}}
}

线程池 ThreadPool

包含 阻塞队列 、工作线程集合、线程数量、超时时间设置、执行任务

//线程池
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {// 任务队列private BlockingQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();// 核心线程数private int coreSize;// 获取任务时的超时时间private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;//初始化public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);this.rejectPolicy = rejectPolicy;}// 执行任务public void execute(Runnable task) {synchronized (workers){if(workers.size() < coreSize){Worker worker = new Worker(task);log.debug("新增 worker 投入工作————工作线程:{}, 任务:{}", worker, task);workers.add(worker);worker.start();}else{taskQueue.tryPut(rejectPolicy, task);}}}class Worker extends Thread{...}
}

工作线程类 Worker

线程池ThreadPool的内部类,主要设置了线程的执行方法,
没有设置超时时间,线程执行完所有任务之后不会被销毁,而是继续等待更多任务,设置超时时间线程则会在长时间获取不到任务后结束执行
while(task != null || (task = taskQueue.take()) != null)循环获取任务队列中的任务并执行
while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) 为带超时的执行方法,循环到超时时间则停止循环

  • 一个线程与一个任务(不带超时)
    在这里插入图片描述

  • 一个线程与一个任务(带超时)
    在这里插入图片描述

class Worker extends Thread{private Runnable task;public Worker(Runnable task){this.task = task;}@Overridepublic void run() {//task不为空执行任务,执行完毕再从任务队列中获取任务并执行//while(task != null || (task = taskQueue.take()) != null){//超时设置while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null){try {log.debug("线程正在执行————任务:{}", task);task.run();} catch(Exception e){e.printStackTrace();} finally {task = null;}}synchronized (workers){log.debug("worker 被移除————工作线程:{}", this);workers.remove(this);}}}

拒绝策略接口

在任务队列满时设置不同的策略处理之后再申请添加的队列

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {void reject(BlockingQueue<T> queue, T task);
}

代码测试类 TestThreadPool

初始化ThreadPool时使用lambda表达式实现拒绝策略,threadPool.execute(() -> { } 传入的就是任务的内容,设置Thread.sleep(1000L) 用于测试拒绝策略

@Slf4j(topic = "c.TestThreadPool")
public class TestThreadPool{public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2,1000, TimeUnit.MILLISECONDS, 2, (queue, task)->{// 1. 死等// queue.put(task);// 2) 带超时等待// queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3) 让调用者放弃任务执行// log.debug("放弃{}", task);// 4) 让调用者抛出异常// throw new RuntimeException("任务执行 失败 " + task);// 5) 让调用者自己执行任务task.run();});for (int i = 0; i < 10; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}

为什么需要j = i?(lambad表达式相关)

Lambda表达式内部引用的变量必须是final或者是事实上的final(即在Lambda表达式内部没有改变过它的值)。在Java 8之前,匿名内部类内部引用的局部变量必须声明为final。在Java 8中,这一规则放宽了一些,但要求这些变量的值在编译时确定,所以被称为"effectively final"(事实上的final)。

i虽然在每次循环中都有不同的值,但它是在Lambda表达式内部引用的。由于Lambda表达式的执行是延迟的,当Lambda表达式被执行时,循环可能已经结束,i的值可能已经改变。为了避免这种情况,需要将i赋值给一个final或者事实上的final变量j,以确保Lambda表达式内部引用的变量是不可变的。

因此,通过将i赋值给j,保证了在Lambda表达式内部引用的j是不可变的,从而避免了可能出现的并发问题。

测试结果

拒绝策略:让调用者自己执行任务

跟如上测试代码一样:coreSize 工作线程数 2,queueCapctiy 任务队列容量 2 ,循环执行10个任务,采用 task.run() 主线程执行的拒绝策略

任务0、1被线程执行,2、3 放入任务队列中,之后也被线程执行,其余线程则被主线程直接执行
在这里插入图片描述

拒绝策略:让调用者放弃任务执行

任务0、1被线程执行,2、3 放入任务队列中,之后也被线程执行,其余线程则被放弃
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/284788.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

外包干了6天,技术明显进步。。。

我是一名大专生&#xff0c;自19年通过校招进入湖南某软件公司以来&#xff0c;便扎根于功能测试岗位&#xff0c;一晃便是近四年的光阴。今年8月&#xff0c;我如梦初醒&#xff0c;意识到长时间待在舒适的环境中&#xff0c;已让我变得不思进取&#xff0c;技术停滞不前。更令…

mysql80-DBA数据库学习1

掌握能力 核心技能 核心技能 mysql部署 官网地址www.mysql.com 或者www.oracle.com https://dev.mysql.com/downloads/repo/yum/ Install the RPM you downloaded for your system, for example: yum install mysql80-community-release-{platform}-{version-number}.noarch…

Qt 写一个邮件发送程序

最近在完成一个邮箱代替的告警功能&#xff0c;写了一个邮件发送的demo 以下为代码&#xff1a; #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow> #include<QTcpSocket> namespace Ui { class MainWindow; }class MainWindow : public QMainWin…

【STL源码剖析】【2、空间配置器——allocator】

文章目录 1、什么是空间配置器&#xff1f;1.1设计一个简单的空间配置器&#xff0c;JJ::allocator 2、具备次配置力( sub-allocation)的 SGI 空间配置器2.1 什么是次配置力2.2 SGI标准的空间配置器&#xff0c;std::allocator2.2 SGI特殊的空间配置器&#xff0c;std::alloc2.…

Java代码基础算法练习-公式求和-2024.03.24

任务描述&#xff1a; 求公式Snaaaaaa…aa…aaa&#xff08;有n个a&#xff09;之值&#xff0c;其中a是一个数字&#xff0c;为2。 例如&#xff0c;n5 时222222222222222&#xff0c;n 由键盘输入(n<5)。 任务要求&#xff1a; package march0317_0331;import java.util.…

【大模型】在VS Code(Visual Studio Code)上安装中文汉化版插件

文章目录 一、下载安装二、配置显示语言&#xff08;一&#xff09;调出即将输入命令的搜索模式&#xff08;二&#xff09;在大于号后面输入&#xff1a;Configure Display Language&#xff08;三&#xff09;重启 三、总结 【运行系统】win 11 【本文解决的问题】 1、英文不…

代码随想录算法训练营第三十一天|455.分发饼干,376.摆动序列,53. 最大子序和

455.分发饼干 题目 假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这是能让孩子们满足胃口的饼干的最小尺寸&#xff1b;并且每块饼干 j&#xff…

wy的leetcode刷题记录_Day93

wy的leetcode刷题记录_Day93 声明 本文章的所有题目信息都来源于leetcode 如有侵权请联系我删掉! 时间&#xff1a;2024-3-23 前言 目录 wy的leetcode刷题记录_Day93声明前言2549. 统计桌面上的不同数字题目介绍思路代码收获 827. 最大人工岛题目介绍思路代码收获 200. 岛屿…

【Godot4.2】像素直线画法及点求取函数

概述 基于CanvasItem提供的绘图函数进行线段绘制只需要直接调用draw_line函数就可以了。 但是对于可以保存和赋值节点直接使用的纹理图片&#xff0c;却需要依靠Image类。而Image类没有直接提供基于像素的绘图函数。只能依靠set_pixel或set_pixelv进行逐个像素的填色。 所以…

数字乡村发展策略:科技引领农村实现跨越式发展

随着信息技术的迅猛发展和数字经济的崛起&#xff0c;数字乡村发展策略已经成为引领农村实现跨越式发展的重要手段。科技的力量正在深刻改变着传统农业的生产方式、农村的社会结构以及农民的生活方式&#xff0c;为农村经济发展注入了新的活力和动力。本文将从数字乡村的内涵、…

java每日一题——买啤酒(递归经典问题)

前言&#xff1a; 非常喜欢的一道题&#xff0c;经典中的经典。打好基础&#xff0c;daydayup!!!啤酒问题&#xff1a;一瓶啤酒2元&#xff0c;4个盖子可以换一瓶&#xff0c;2个空瓶可以换一瓶&#xff0c;请问10元可以喝几瓶 题目如下&#xff1a; 啤酒问题&#xff1a;一瓶…

基于图的在线社区假新闻检测建模

论文原文&#xff1a;Graph-based Modeling of Online Communities for Fake News Detection 论文代码&#xff1a;GitHub - shaanchandra/SAFER: Repository containing the official code for the paper Graph-based Modeling of Online Communities for Fake News Detectio…

KIMI爆了!对比文心一言和通义千问它到底有多强?

原文:赵侠客 前言 最近国产大模型KIMI爆了大部分人应该都知道了&#xff0c;从我个人的感受来看这次KIMI爆了我不是从技术领域接触到的&#xff0c;而是从各种金融领域接触到的。目前国内大模型可以说是百模大战&#xff0c;前几年新能源大战&#xff0c;今年资本割完韭菜后留…

java面向对象编程基础

对象&#xff1a; java程序中的对象&#xff1a; 本质上是一种特殊的数据结构 对象是由类new出来的&#xff0c;有了类就可以创建对象 对象在计算机的执行原理&#xff1a; student s1new student();每次new student(),就是在堆内存中开辟一块内存区域代表一个学生对象s1变…

Matlab DDPG

文章目录 1 rlSimulinkEnv1.1 说明1.2 例子1.2.1 使用工作空间Agent创建Simulink环境1.2.2 为Simulink模型创建强化学习环境1.2.3 创建Simulink多Agents环境2 创建Simulink环境和训练Agent2.1 创建环境接口2.2 创建DDPG Agent2.3 训练Agent2.4 验证已训练的Agent3 创建Simulink…

创建linux虚拟机系统:(安装Ubuntu镜像文件,包含语言设置、中文输入法、时间设置)

我下载的是清华大写开源软件镜像站中的ubuntu-20.04.6-desktop-amd64.iso这个镜像文件&#xff0c; 这个文件我下载完成之后没有解压&#xff0c;直接在创建虚拟机的时候选择的压缩包。 地址为&#xff1a;Index of /ubuntu-releases/20.04/ | 清华大学开源软件镜像站 | Tsin…

Git——IDEA中的使用详解

目录 Git1、IDEA中配置Git2、将本地项目推送到远程仓库2.1、创建项目远程仓库2.2、初始化本地仓库2.3、连接远程仓库2.4、提交到本地仓库2.5、推送到远程仓库 3、克隆远程仓库到本地4、基本操作4.1、代码提交到暂存区4.2、暂存区代码提交到本地库4.3、推送到远程仓库4.4、撤销本…

网络: 网络层

IP地址: 分为网络号和主机号. 用来标识主机 IP协议 IP协议报文 4位版本号(version): 指定IP协议的版本, 对于IPv4来说, 就是4.4位头部长度(header length): IP头部的长度是多少个32bit, 也就是 length * 4 的字节数. 4bit表示最大的数字是15, 因此IP头部最大长度是60字节. 8…

HarmonyOS NEXT应用开发案例集

概述 随着应用代码的复杂度提升&#xff0c;为了使应用有更好的可维护性和可扩展性&#xff0c;良好的应用架构设计变得尤为重要。本篇文章将介绍一个应用通用架构的设计思路&#xff0c;以减少模块间的耦合、提升团队开发效率&#xff0c;为开发者呈现一个清晰且结构化的开发…

YOLOv8:Roboflow公开数据集训练模型

Roboflow公开数据集 Roboflow是一个提供计算机视觉数据集管理和处理工具的平台。虽然Roboflow本身并不创建或策划公开数据集&#xff0c;但它提供了一系列功能&#xff0c;帮助用户组织、预处理、增强和导出计算机视觉数据集。 官方网站&#xff1a;https://universe.roboflow…