Fork/Join框架
是什么
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork: 把一个大任务切分为若干子任务并行的执行
Join: 合并这些子任务的执行结果,最后得到这个大任务的结果。

运行流程图
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取流程
工作窃取算法的优缺点:
优点:充分利用线程进行并行计算,减少了线程间的竞争。
缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
Fork/Join框架的设计
步骤1 分割任务。需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后启动几个线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情。
ForkJoinTask:要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask:用于有返回结果的任务。
ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。
任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
Fork/Join框架基本使用
需求:计算1+2+3+4的结果。
使用Fork/Join框架首先要考虑到的是如何分割任务,上述需求希望每个子任务最多执行两个数相加,因此,分割阈值设置为2。
有返回结果的任务,所以必须继承RecursiveTask
示例代码如下:
public class CountTask extends RecursiveTask<Integer> {/*** 阈值*/private static final int THRESHOLD = 2;private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;// 如果任务足够小就计算任务boolean canCompute = (end - start) <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分割成两个子任务计算int middle = (start + end) / 2;CountTask leftTask = new CountTask(start, middle);CountTask rightTask = new CountTask(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// 等待子任务执行完,并得到其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();// 生成一个计算任务,负责计算1+2+3+4CountTask task = new CountTask(1, 4);// 执行一个任务Future<Integer> result = forkJoinPool.submit(task);try {System.out.println(result.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException();}}
}
ForkJoinTask与一般任务的主要区别在于它需要实现compute方法,需要判断任务是否足够小(小于阈值),如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。 使用join方法会等待子任务执行完并得到其结果。
Fork/Join框架的异常处理
ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。
if(task.isCompletedAbnormally()){System.out.println(task.getException());}
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。
Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
工作队列:
static final class WorkQueue {static final int INITIAL_QUEUE_CAPACITY = 1 << 13;static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M// Instance fieldsvolatile int scanState; // versioned, <0: inactive; odd:scanningint stackPred; // pool stack (ctl) predecessorint nsteals; // number of stealsint hint; // randomization and stealer index hintint config; // pool index and modevolatile int qlock; // 1: locked, < 0: terminate; else 0volatile int base; // index of next slot for pollint top; // index of next slot for pushForkJoinTask<?>[] array; // the elements (initially unallocated)final ForkJoinPool pool; // the containing pool (may be null)final ForkJoinWorkerThread owner; // owning thread or null if sharedvolatile Thread parker; // == owner during call to park; else nullvolatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoinvolatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer// ...
}
(1)ForkJoinTask的fork方法实现原理
调用ForkJoinTask的fork方法时,程序会将任务丢到任务队列里,然后立即返回结果。
public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);return this;}
static final ForkJoinPool common;
ForkJoinPool.common.externalPush(this);
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;int r = ThreadLocalRandom.getProbe();int rs = runState;if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {int j = ((am & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s + 1);U.putIntVolatile(q, QLOCK, 0);if (n <= 1)signalWork(ws, q);return;}U.compareAndSwapInt(q, QLOCK, 1, 0);}externalSubmit(task);}
上述代码:把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。
(2)ForkJoinTask的join方法实现原理
Join方法的主要作用是阻塞当前线程并等待获取结果。代码如下:
public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();}private void reportException(int s) {if (s == CANCELLED)throw new CancellationException();if (s == EXCEPTIONAL)rethrow(getThrowableException());}
调用了doJoin()方法,通过doJoin()方法得到当前任务的状态与运算来判断返回什么结果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。
- 如果任务状态是已完成,则直接返回任务结果。
- 如果任务状态是被取消,则直接抛出CancellationException。
- 如果任务状态是抛出异常,则直接抛出对应的异常。
doJoin()方法的实现代码。
private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();}final int doExec() {int s; boolean completed;if ((s = status) >= 0) {try {completed = exec();} catch (Throwable rex) {return setExceptionalCompletion(rex);}if (completed)s = setCompletion(NORMAL);}return s;}
首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;
如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。
相关文章:
Fork/Join框架
是什么 Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 Fork: 把一个大任务切分为若干子任务并行的执行 Join: 合并这些子任务的执行结果,最后…...
LeetCode_字符串_中等_468.验证 IP 地址
目录 1.题目2.思路3.代码实现(Java) 1.题目 给定一个字符串 queryIP。如果是有效的 IPv4 地址,返回 “IPv4” ;如果是有效的 IPv6 地址,返回 “IPv6” ;如果不是上述类型的 IP 地址,返回 “Nei…...
ABAP Der Open SQL command is too big.
ABAP Der Open SQL command is too big. DBSQL_STMNT_TOO_LARGE CX_SY_OPEN_SQL_DB 应该是选择条件中 维护的条件值条数太多了...
QChart类用来 管理 图表的:数据序列(series)、图例(legend)和坐标轴(axis)
QChart类用来 管理 图表的:数据序列(series)、图例(legend)和坐标轴(axis) 1、数据序列类 继承关系 2、坐标轴类 的继承关系 3、图例类 什么是图例? 图例:是集中于地图…...
Servlet+JDBC实战开发书店项目讲解第10篇:在线客服功能实现
在线客服功能实现 实现思路 要实现在线客服功能,您可以考虑以下步骤: 创建一个用于存储客户消息和回复的数据库表。您可以使用JDBC连接到数据库,并使用SQL语句创建表格。 在您的Servlet中,创建一个用于处理客户消息和回复的POS…...
CVE-2023-21292 AMS框架层高危漏洞分析
文章目录 前言漏洞细节故事起源漏洞利用漏洞修复 总结 前言 本周在分析 Google 官方发布的 Android 2023 年8 月安全公告 涉及的漏洞补丁的时候,遇到一个有意思的漏洞:CVE-2023-21292。 之所以说它有意思是因为这个漏洞早在去年年底就在某平台上被国外…...
cuda、cuDNN、深度学习框架、pytorch、tentsorflow、keras这些概念之间的关系
当讨论CUDA、cuDNN、深度学习框架、pytorch、tensorflow、keras这些概念的时候,我们讨论的是与GPU加速深度学习相关的技术和工具。 CUDA(Compute Unified Device Architecture): CUDA是由NVIDIA开发的一种并行计算平台和编程模型&…...
第二讲:BeanFactory的实现
BeanFactory的实现 1. 环境准备2. 初始化DefaultListableBeanFactory3. 手动注册BeanDefinition4. 手动添加后置处理器5. 获取被依赖注入的Bean对象6. 让所有的单例bean初始化时加载7. 总结 Spring 的发展历史较为悠久,因此很多资料还在讲解它较旧的实现,…...
vue2+Spring Boot2.7 大文件分片上传
之前我们文章 手把手带大家实现 vue2Spring Boot2.7 文件上传功能 将了上传文件 但如果文件很大 就不太好处理了 按正常情况甚至因为超量而报错 这里 我弄了个足够大的文件 我们先搭建 Spring Boot2.7 环境 首先 application.yml 代码编写如下 server:port: 80 upload:path:…...
Vite更新依赖缓存失败,强制更新依赖缓存
使用vitets开发一段时间了,感觉并不是想象中的好用,特别是出现些稀奇古怪的问题不好解决,比如下面这个问题 上午9:50:08 [vite] error while updating dependencies: Error: ENOENT: no such file or directory, open E:/workspace-dir/node…...
Linux命令200例:tail用来显示文件的末尾内容(常用)
🏆作者简介,黑夜开发者,全栈领域新星创作者✌。CSDN专家博主,阿里云社区专家博主,2023年6月csdn上海赛道top4。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责人。 &…...
【Unity每日一记】进行发射,位置相关的方法总结
👨💻个人主页:元宇宙-秩沅 👨💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨💻 本文由 秩沅 原创 👨💻 收录于专栏:uni…...
MISRA 2012学习笔记(3)-Rules 8.4-8.7
文章目录 Rules8.4 字符集和词汇约定(Character sets and lexical conventions)Rule 4.1 八进制和十六进制转译序列应有明确的终止识别标识Rule 4.2 禁止使用三字母词(trigraphs) 8.5 标识符(Identifiers)Rule 5.1 外部标识符不得重名Rule 5.2 同范围和命名空间内的标识符不得重…...
centos7组件搭建
Linux(包括centos) 如何查看服务器内存、CPU su - root 切换用户 centos 密码 空格 https://blog.csdn.net/weixin_45277161/article/details/131524555 CentOS 7 安装 Docker 的详细步骤 https://blog.csdn.net/qq_39997939/article/details/13100…...
webpack5和webpack4的一些区别
自动清除打包目录 webpack4 // bash npm i clean-webpack-plugin -D //webpack.config.js const {CleanWebpackPlugin} require(clean-webpack-plugin); module.exports {plugins: [new CleanWebpackPlugin()} } webpack5 module.exports {output: {clean: true} } topLevel…...
攻防世界-fileclude
原题 解题思路 直接展示源码了,flag.php应该存放了flag,在file1与file2都不为空且file2是“hello ctf”时file1将被导入。接下来做法很明显,让file为flag.php,file2为“hello ctf”。“?file1php://filter/readconvert.base64-en…...
深度学习的“前世今生”
1、“感知机”的诞生 20世纪50年代,人工智能派生出了这样两个学派,分别是“符号学派”及“连接学派”。前者的领军学者有Marvin Minsky及John McCarthy,后者则是由Frank Rosenblatt所领导。 “符号学派”的人相信对机器从头编程,…...
第一百一十九回 如何通过蓝牙设备读写数据
文章目录 概念介绍实现方法示例代码经验总结我们在上一章回中介绍了如何获取蓝牙状态相关的内容,本章回中将介绍 如何通过蓝牙设备读写数据。闲话休提,让我们一起Talk Flutter吧。 概念介绍 通过蓝牙设备读写数据有两种方法: 一种是读写Characteristics;一种是读写Descri…...
linux:Temporary failure in name resolutionCouldn’t resolve host
所有域名无法正常解析。 ping www.baidu.com 等域名提示 Temporary failure in name resolution错误。 rootlocalhost:~# ping www.baidu.com ping: www.baidu.com: Temporary failure in name resolution rootlocalhost:~# 一、ubuntu/debian(emporary failure i…...
C 语言的 sprintf() 函数
<stdio.h> 原型: int sprintf(char *str, const char *format, …) 发送格式化输出到 str 所指向的字符串。 参数 str – 这是指向一个字符数组的指针,该数组存储了 C 字符串。 format – 这是字符串,包含了要被写入到字符串 str 的文本。它…...
基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...
学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
dify打造数据可视化图表
一、概述 在日常工作和学习中,我们经常需要和数据打交道。无论是分析报告、项目展示,还是简单的数据洞察,一个清晰直观的图表,往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server,由蚂蚁集团 AntV 团队…...
OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
Java 二维码
Java 二维码 **技术:**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...
C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...
基于TurtleBot3在Gazebo地图实现机器人远程控制
1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...
从 GreenPlum 到镜舟数据库:杭银消费金融湖仓一体转型实践
作者:吴岐诗,杭银消费金融大数据应用开发工程师 本文整理自杭银消费金融大数据应用开发工程师在StarRocks Summit Asia 2024的分享 引言:融合数据湖与数仓的创新之路 在数字金融时代,数据已成为金融机构的核心竞争力。杭银消费金…...
