高并发设计模式之ForkJoin模式

分而治之是一种思想,所谓分而治之就是把一个复杂的算法问题按一定的分解方法分为规模较小的若干部分,然后逐个解决,分别找出各部分的解,最后把各部分的解在整合成整个问题的解.ForkJoin模式就是分而治之思想的另一种应用.

ForkJoin模式的原理

ForkJoin模式先把一个大任务分解成许多独立的子任务,然后开启多个线程并行去处理这些子任务.执行的过程大致如下.

 ForkJoin框架

JUC包提供了一套ForkJoin框架的实现,具体以ForkJoinPool线程池的形式提供,并且该线程池在java8的Lambda并行流框架中充当着底层框架的角色.包含如下组件.

1:ForkJoinPool 执行任务的线程池,继承了 AbstractExecutorService类.

2:ForkJoinWorkerThread 执行任务的工作线程.每个线程都维护这一个内部队列,用于存放内部任务.该类继承了Thread类.

3:ForkJoinTask: 用于ForkJoinPool的任务抽象类,实现了Future接口.

4: RecursiveAction 不返回结果的递归执行任务,是ForkJoinTask的子类,在子任务不带返回结果时使用.

ForkJoin框架的使用实战

假设需要计算0-100的累加求和,可以使用ForkJoin框架完成.首先需要设计一个可以递归执行的异步子任务类.

1:可递归执行的异步任务类AccumulateTask

参考代码如下.

public class AccumulateTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 2;/*** 累加的起始编号.*/private int start;/*** 累加的结束编号.*/private int end;public AccumulateTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;//判断任务的规模.若规模小则可以直接计算.Boolean canCompute = end - start <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}System.out.println("执行任务,计算" + start + "到" + end + "的和,结果是" + sum);} else {//任务过大需要进行切割.int middle = (start + end) / 2;//切割成两个任务.AccumulateTask lTask = new AccumulateTask(start, middle);AccumulateTask rTask = new AccumulateTask(middle + 1, end);//依次调用每个子任务的fork方法.lTask.fork();rTask.fork();//等待子任务完成.依次调用每个子任务的join方法合并并执行结果.Integer leftResult = lTask.join();Integer rightResult = rTask.join();//合并子任务的结果.sum = leftResult + rightResult;}return sum;}
}

自定义的异步任务子类AccumulateTask继承自RecursiveTask,每一次执行可以携带返回值.在AccumulateTask通过THRESHOLD常量设置子任务分解的阈值,并在它的compute()方法中会进行阈值的判断.逻辑如下.

1:若当前的计算规模大于THRESHOLD,则当前任需要进一步分解.相反则直接求和.

2:如果子任务可以直接执行,则进行求和操作,并返回结果.如果任务进行了分解,就需要等待所有的任务执行完毕,然后对各个分解结果进行求和.

ForkJoinPool调度AccumulateTask()

参考代码如下.

public class ForkJoinTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ForkJoinPool forkJoinPool = new ForkJoinPool();AccumulateTask task = new AccumulateTask(1, 100);ForkJoinTask<Integer> future = forkJoinPool.submit(task);Integer integer = future.get(1, TimeUnit.SECONDS);System.out.println("最终计算结果为" + integer);}
}

ForkJoinPool框架的核心API

 ForkJoinPool框架的核心是ForkJoinPool线程池.该线程池使用一个无锁的栈来管理空闲线程,如果一个工作线程暂时取不到可用的任务,咋可能被挂起,而挂起的线程将被压入ForkJoinPool维护的栈中,等待新任务到来.

1:ForkJoinPool的构造器

public ForkJoinPool(int parallelism,//并行度 默认为cpu数,最小为1.ForkJoinWorkerThreadFactory factory,//线程创建工厂UncaughtExceptionHandler handler,//异常处理程序boolean asyncMode) {///是否为异步模式.this(checkParallelism(parallelism),checkFactory(factory),handler,asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");checkPermission();}

 parallelism:并行级别

设定的级别决定框架内并行线程的数量.并行的每一个任务都会有一个线程进行处理,但这个属性并不是框架的最大线程数量,该属性也和ThreadPoolExecutor线程池中的核心线程数和最大线程数有区别.因为他们的工作方式不一样.Forkjoin框架中可存在的线程数量和parallelism参数并不是绝对的关联.

Factory:线程创建工厂

当ForkJoin框架创建一个新的线程,同样会用到线程工厂,只不过这个线程不在需要实现ThreadFactory接口,而是需要实现ForkJoinWorkerThreadFactory接口.后者是一个函数接口.只需要实现一个叫newThread()方法,在ForkJoin框架中有一个默认的实现:DefaultForkJoinWorkerThreadFactory.

handler:异常捕获处理程序

当执行任务中出现异常,并从任务中被抛出时,就会被handler捕获.

 asyncMode:异步模式

该参数表示任务是否为异步模式,默认值为false.如果参数值为true,表示子任务的执行遵循FIFO(先进先出)顺序,并且子任务不能被合并,如果为false,表示子任务的执行顺序为LIFO(后进先出),并且子任务可以被合并.虽然表面意思为异步,仅仅指任务调度.

ForkJoinPool的common通用池

private static ForkJoinPool makeCommonPool() {final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =new CommonPoolForkJoinWorkerThreadFactory();int parallelism = -1;ForkJoinWorkerThreadFactory factory = null;UncaughtExceptionHandler handler = null;try {  // ignore exceptions in accessing/parsing propertiesString pp = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");String fp = System.getProperty("java.util.concurrent.ForkJoinPool.common.threadFactory");String hp = System.getProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler");if (pp != null)parallelism = Integer.parseInt(pp);if (fp != null)factory = ((ForkJoinWorkerThreadFactory)ClassLoader.getSystemClassLoader().loadClass(fp).newInstance());if (hp != null)handler = ((UncaughtExceptionHandler)ClassLoader.getSystemClassLoader().loadClass(hp).newInstance());} catch (Exception ignore) {}if (factory == null) {if (System.getSecurityManager() == null)factory = commonPoolForkJoinWorkerThreadFactory;else // use security-managed defaultfactory = new InnocuousForkJoinWorkerThreadFactory();}if (parallelism < 0 && // default 1 less than #cores(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)parallelism = 1;if (parallelism > MAX_CAP)parallelism = MAX_CAP;return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,"ForkJoinPool.commonPool-worker-");}

使用common线程池的优点是可以通过指定系统属性的方式定义并行度 线程工厂和异常处理类. 

向ForkJoinPool线程池提交任务的方式

1:外部任务提交 

提交外部任务有三种方式:方式一使用invoke方法,该方法提交任务后线程会等待,等到任务计算完毕并返回结果.方法二使用execute()方法提交一个任务来异步执行,无返回结果.方法三使用submit方法提交一个任务,会返回一个ForkJoinTask实例,适当的时候获取结果.

2:子任务提交.

由任务实例的fork方法完成,当任务被分解以后,内部会调用ForkJoinPool.WorkQueue.push方法直接把任务放到内部队列中等待被执行.

工作窃取算法

 ForkJoinPool线程池的任务分为外部任务和内部任务,两种任务存放的位置不同.

1:外部任务放在ForkJoinPool全局队列中.

2:子任务会存放在内部队列中,ForkJoinPool线程池中的每个线程池都维护这一个内部队列.用于存放这些内部任务.

由于ForkJoinPool线程池有多个工作线程,与之相对应的就会有多个任务队列,就会出现任务分配不均衡的问题,有的任务队列任务多,忙的不停,有的队列没有任务一直空闲.工作窃取方法可以完美解决.

工作窃取的核心思想是:工作线程自己的活干完了,会去看看别人有没有没干完的活,如果有就拿过来帮忙做.每个线程拥有一个双端队列用于存放需要执行的任务,当自己队列没任务的时候,可以从其他队列里获取任务执行.

ForkJoin框架的原理

1:ForkJoin框架的线程池ForkJoinPool分为外部任务和内部任务.

2:外部任务是放在ForkJoinPool的全局队列中.

3:ForkJoinPool池中的每个线程都维护这一个内部任务队列用于存放内部任务,线程池切割的子任务就放在内部任务当中.

4:当工作线程想要拿到子任务的计算结果时,先判断子任务有没有完成.如果没有完成,再去判断子任务有没有被窃取.如果子任务没有被窃取就由本线程来完成.如果被窃取了,就去执行内部队列的其他任务.或者去扫描其他队列的任务.

5:当工作线程完成自己内部的任务,就会去扫描其他队列的任务.尽可能不会阻塞等待.

月盈则亏,月缺则满.修心则以.

如果大家喜欢我的分享的话,可以关注下我的微信公众号

心有九月星辰

 

 

 
 
 

 
 
 
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/461045.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AMD XILINX 20nm器件价格上调25%

随着市场回暖&#xff0c;台积电也在调整价格策略&#xff0c;近期台积电上调了20nm的出厂价格。 据相关消息显示&#xff0c;AMD为了保障持续的供货和服务&#xff0c;也计划将20nm器件的价格统一上调25%&#xff0c;预计将于11月发布正式的涨价通知&#xff0c;并于2025年Q1开…

EfficientNet-B6模型实现ISIC皮肤镜图像数据集分类

项目源码获取方式见文章末尾&#xff01; 回复暗号&#xff1a;13&#xff0c;免费获取600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【基于opencv答题卡识别判卷】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【G…

为何我们要将测试左移?回到过去的美好时光

以下为作者观点&#xff1a; 为何我们将测试左移&#xff1f;在传统的开发周期中&#xff0c;测试通常在功能完成后甚至在开发阶段结束时进行。左移测试通过从开发过程开始到整个开发过程整合测试活动来挑战这一点。 让我们首先讨论一下为什么我们选择“左移”&#xff0c;因…

java项目之基于智能推荐的卫生健康系统(springboot)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的基于智能推荐的卫生健康系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 基于智能推荐…

性能测试详解

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 一、 性能测试术语解释 1. 响应时间 响应时间即从应用系统发出请求开始&#xff0c;到客户端接收到最后一个字节数据为止所消耗的时间。响应时间按软件的特点…

深度学习基础—循环神经网络(RNN)

引言 从本系列博客开始&#xff0c;我们将来一起学习一下NLP领域的相关基础知识&#xff0c;NLP领域重要的模型是RNN&#xff0c;在此之前&#xff0c;先来了解一些符号的含义。 1.符号定义 &#xff08;1&#xff09;符号定义 假设建立一个能够自动识别句中人名位置的序列模型…

【工具变量】自由贸易试验区试点DID数据集(2003-2023年)

数据简介&#xff1a;自由贸易试验区&#xff08;Free Trade Zone&#xff0c;简称FTZ&#xff09;是中国ZF在新形势下为了推进GG开放、提高开放型经济水平而采取的重要战略举措。自贸试验区在一国的部分领土内运入任何货物&#xff0c;被认为在关境以外&#xff0c;免于实施惯…

Flask

创建项目 Pycharm专业版 默认文件 Pycharm社区版没有自动创建这几个文件&#xff0c;手动创建即可。 运行 常规功能 debug模式 修改内容自动更新&#xff0c;否则需要重新启动运行项目才生效。 修改host 通网络内其他人可以通过我得ip访问该服务。 修改端口号 空格分隔…

[Wireshark] 使用Wireshark抓包https数据包并显示为明文、配置SSLKEYLOGFILE变量(附下载链接)

前言 wireshark安装包 链接&#xff1a;https://pan.quark.cn/s/febb28f57c01 提取码&#xff1a;fUCQ 链接失效&#xff08;可能会被官方和谐&#xff09;可评论或私信我重发 chrome与firefox在访问https网站的时候会将密钥写入这个环境变量SSLKEYLOGFILE中&#xff0c;在wir…

野火鲁班猫4 (RK3588)系统配置

烧写系统 参考文档 &#xff1a; https://doc.embedfire.com/linux/rk3588/quick_start/zh/latest/quick_start/apt/apt.html 先装第一个软件&#xff0c;然后打开第二个软件。点固件&#xff0c;选择Ubuntu最新的固件&#xff0c;这边目前是20240911这个。 我这边直接烧写到…

Servlet 3.0 新特性全解

文章目录 Servlet3.0新特性全解Servlet 3.0 新增特性Servlet3.0的注解Servlet3.0的Web模块支持servlet3.0提供的异步处理提供异步原因实现异步原理配置servlet类成为异步的servlet类具体实现异步监听器改进的ServletAPI(上传文件) Servlet3.0新特性全解 tomcat 7以上的版本都支…

[OceanBase-不止于记录]:揭秘双引擎战略,共探AI时代数据架构未来

前言 又到了一年一度大家最爱的探会文章&#xff0c;非常荣幸收到OceanBase官方的邀请参加2024 OceanBase 年度发布会&#xff0c;作为一个经常参加线下探会的博主&#xff0c;每一次体验都有所不同&#xff0c;每一次新技术的突破都让人感到无比兴奋。同时&#xff0c;作为数…

【论文复现】短期电力负荷

作者主页&#xff1a; 七七的个人主页 文章收录专栏&#xff1a; 论文复现 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01;&#x1f496;&#x1f496; 短期电力负荷 论文发表问题背景一. 基本问题二. 本论文发现的问题 对于论文发现问题的解决方案&#xff1a;复现…

Java-I/O框架:FileReader类使用、FileWriter类的使用、字符流复制文件

视频链接&#xff1a;16.19 字符流复制文件_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1Tz4y1X7H7?spm_id_from333.788.videopod.episodes&vd_sourceb5775c3a4ea16a5306db9c7c1c1486b5&p19 1.FileReader类&#xff08;文件字符输出流&#xff09;使用 pac…

快速生成高质量提示词,Image to Prompt 更高效

抖知书老师推荐&#xff1a; 随着 AI 技术的不断发展&#xff0c;视觉信息与语言信息之间的转换变得越来越便捷。在如今的数字化生活中&#xff0c;图像与文字的交互需求愈发旺盛&#xff0c;很多人都希望能轻松将图像内容直接转化为文本描述。今天我们来推荐一款实用的 AI 工…

网安秋招面试

《Java代码审计》http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247484219&idx1&sn73564e316a4c9794019f15dd6b3ba9f6&chksmc0e47a67f793f371e9f6a4fbc06e7929cb1480b7320fae34c32563307df3a28aca49d1a4addd&scene21#wechat_redirect 《Web安全》h…

Python小游戏14——雷霆战机

首先&#xff0c;你需要确保安装了Pygame库。如果你还没有安装&#xff0c;可以使用pip来安装&#xff1a; bash pip install pygame 代码如下&#xff1a; python import pygame import sys import random # 初始化Pygame pygame.init() # 设置屏幕大小 screen_width 800 scr…

编程之路:蓝桥杯备赛指南

文章目录 一、蓝桥杯的起源与发展二、比赛的目的与意义三、比赛内容与形式四、比赛前的准备五、获奖与激励六、蓝桥杯的影响力七、蓝桥杯比赛注意事项详解使用Dev-C的注意事项 一、蓝桥杯的起源与发展 蓝桥杯全国软件和信息技术专业人才大赛&#xff0c;简称蓝桥杯&#xff0c…

Python3 No module named ‘pymysql‘

在使用python3链接数据库时&#xff0c;总是提示 No module named pymysql 错误&#xff0c;执行pip3 install pymysql后&#xff0c;提示安装成功&#xff0c;但是执行py文件还是提示此错误。 使用python2 执行时&#xff0c;链接数据库正确&#xff0c;百思不得其解 先使用…

SpringBoot调用SOAP接口步骤详解。

1、引入依赖 <dependency><groupId>org.springframework.ws</groupId><artifactId>spring-ws-core</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-oxm</ar…