work-stealing算法 ForkJoinPool
专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162
本文目标:
- 重点是通过例子程序理解work-stealing算法原理
目录
- work-stealing算法
- 算法原理和优缺点介绍
- 使用场景
- work-stealing例子代码
- ForkJoinPool
- new ForkJoinPool()
- ForkJoinTask
- 例子代码1
- 例子代码2
work-stealing算法
算法原理和优缺点介绍
work-stealing算法是一种用于多线程并行计算中的任务调度算法。该算法的核心思想是允许空闲的线程从其他忙碌线程的工作队列中“窃取”任务来执行,以提高整体的资源利用率和系统的吞吐量。
在work-stealing算法中,每个线程通常都维护一个自己的任务队列,用于存储需要执行的任务。当某个线程完成自己的任务队列中的所有任务后,它会尝试从其他线程的任务队列中窃取任务。为了防止多个线程同时窃取同一个线程的任务,通常需要使用一些同步机制,如锁或原子操作等。
work-stealing算法的优点在于它能够动态地平衡负载,使得各个线程之间的任务分配更加均匀,从而提高了系统的并行效率和资源利用率。此外,该算法还具有较好的可扩展性和适应性,能够随着任务量的增加或减少而自动调整线程的数量和工作负载。
然而,work-stealing算法也存在一些挑战和限制。例如,在窃取任务时需要进行同步操作,这可能会增加一定的开销。此外,如果任务之间存在数据依赖关系,那么窃取任务可能会破坏这种依赖关系,从而导致错误的结果。因此,在使用work-stealing算法时需要根据具体的应用场景和任务特点进行权衡和选择。
介绍部分来自ai回答。可以简单理解如下:
一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。
线程1 - 队列1(任务1,任务2,任务3,…)
线程2 - 队列2(任务1,任务2,任务3,…)
比如线程1早早的把队列中任务都处理完了有空闲,但是队列2执行任务较慢;这样队列2中任务可以让线程1帮忙执行(即窃取
线程1的任务)
使用场景
work-stealing算法主要应用于多线程并行计算场景,特别是在任务数量不确定、任务粒度不均匀或者负载容易波动的情况下。以下是一些具体的应用场景:
并行计算框架:work-stealing算法被广泛应用于各种并行计算框架中,如Intel TBB(Threading Building Blocks)、Cilk Plus以及Java的ForkJoinPool等。这些框架利用work-stealing算法来动态地平衡各个线程之间的负载,提高并行计算的效率。
大数据处理:在大数据处理领域,如Hadoop、Spark等分布式计算框架中,work-stealing算法可以用于优化任务调度。通过允许空闲节点窃取其他忙碌节点的任务,可以更加均衡地分配工作负载,提高整个集群的处理能力。
高性能计算:在高性能计算领域,work-stealing算法也被用于优化并行任务的调度。特别是在处理大规模科学计算和模拟仿真等任务时,work-stealing算法能够有效地平衡各个计算节点之间的负载,提高整体的计算效率。
实时系统:在实时系统中,任务的及时完成至关重要。work-stealing算法可以通过动态地调整任务分配,确保各个线程都能够及时完成任务,从而提高系统的实时性能。
云计算和虚拟化环境:在云计算和虚拟化环境中,资源的使用是动态的,并且负载容易波动。work-stealing算法可以用于优化虚拟机或容器之间的任务调度,确保资源的有效利用和负载均衡。
总之,work-stealing算法适用于各种需要高效利用多线程并行计算能力的场景。通过动态地平衡负载和提高资源利用率,它能够显著地提高系统的并行效率和整体性能。
work-stealing例子代码
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;class Task implements Runnable {private final int taskId;public Task(int taskId) {this.taskId = taskId;}@Overridepublic void run() {try{TimeUnit.MILLISECONDS.sleep(300);// 线程0执行的任务慢一点,可以被窃取if (taskId % 4 == 0) {TimeUnit.MILLISECONDS.sleep(800);}}catch (Exception e){}System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName());}public int getTaskId() {return taskId;}
}class WorkerThread extends Thread {private final BlockingQueue<Task> taskQueue;private final List<WorkerThread> allWorkers;private final AtomicInteger taskIdGenerator;private volatile boolean running = true;public WorkerThread(BlockingQueue<Task> taskQueue, List<WorkerThread> allWorkers, AtomicInteger taskIdGenerator) {this.taskQueue = taskQueue;this.allWorkers = allWorkers;this.taskIdGenerator = taskIdGenerator;}@Overridepublic void run() {while (running) {Task task = null;try {// Try to retrieve a task from this worker's queuetask = taskQueue.poll(100, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}if (task != null) {// Execute the retrieved tasktask.run();} else {// If no task is retrieved, try to steal a task from another workertask = stealTask();if (task != null) {System.out.println("task " + task.getTaskId() + " stolen by " + Thread.currentThread().getName());task.run();}}}}private Task stealTask() {List<WorkerThread> shuffledWorkers = new ArrayList<>(allWorkers);Collections.shuffle(shuffledWorkers);for (WorkerThread worker : shuffledWorkers) {if (worker != this && !worker.taskQueue.isEmpty()) {Task stolenTask = worker.taskQueue.poll();if (stolenTask != null) {return stolenTask;}}}return null;}public void addTask(Task task) {taskQueue.offer(task);}public void stopTask() {running = false;}public static void main(String[] args) {int numWorkers = 4;BlockingQueue<Task> sharedQueue = new LinkedBlockingQueue<>();List<WorkerThread> workers = new ArrayList<>();AtomicInteger taskIdGenerator = new AtomicInteger(0);// Create and start worker threadsfor (int i = 0; i < numWorkers; i++) {BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>();WorkerThread worker = new WorkerThread(taskQueue, workers, taskIdGenerator);workers.add(worker);worker.start();}// Add tasks to the shared queue (or directly to worker queues for simplicity in this example)for (int i = 0; i < 20; i++) {int taskId = taskIdGenerator.incrementAndGet();int selectWorkId = taskId % numWorkers;Task task = new Task(taskId);workers.get(selectWorkId).addTask(task); // Distribute tasks round-robin for simplicity}// Let the workers run for some time before stopping themtry {Thread.sleep(5000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}// Stop all workersfor (WorkerThread worker : workers) {worker.stopTask();}// Wait for all workers to terminate (not strictly necessary in this example, but good practice)for (WorkerThread worker : workers) {try {worker.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
注意:
- 在这个示例中,我们使用了BlockingQueue来实现任务队列,它支持线程安全的队列操作。
- WorkerThread类表示一个工作线程,它有自己的任务队列,并会尝试执行自己的任务或窃取其他线程的任务。
- Task类表示一个可执行的任务,它简单地打印出正在执行任务的线程名称和任务ID。
- 在main方法中,我们创建了指定数量的工作线程,并向它们分配了任务。然后,让工作线程运行一段时间后停止。
这个示例是一个简化的版本,实际的生产环境中可能需要更复杂的同步机制和错误处理。此外,为了简化示例,任务被直接添加到了工作线程的任务队列中,而不是使用一个共享的窃取队列。在实际实现中,可以考虑使用一个共享的窃取队列来优化任务窃取过程。
测试输出如下:
可以看到线程0的任务被其它线程窃取执行了。
ForkJoinPool
new ForkJoinPool()
public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);}
/*** Creates a {@code ForkJoinPool} with the given parameters, without* any security checks or parameter validation. Invoked directly by* makeCommonPool.*/
private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
默认线程数量是:Runtime.getRuntime().availableProcessors()
ForkJoinTask
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Abstract base class for tasks that run within a ForkJoinPool
. A ForkJoinTask
is a thread-like entity that is much lighter weight than a normal thread. Huge numbers of tasks and subtasks may be hosted by a small number of actual threads in a ForkJoinPool, at the price of some usage limitations.
任务提交处理如下:
steal逻辑
例子代码1
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;public class ForkJoinTaskExample extends RecursiveTask<Integer> {public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分裂成两个子任务计算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// invokeAll(leftTask, rightTask);// 等待任务执行结束合并其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}static void testForkJoinPool() throws Exception{ForkJoinPool forkjoinPool = new ForkJoinPool();int sta = 1;int end = 100;//生成一个计算任务,计算连续区间范围的和ForkJoinTaskExample task = new ForkJoinTaskExample(sta, end);//执行一个任务Future<Integer> result = forkjoinPool.submit(task);System.out.println("result:" + result.get());}public static void main(String[] args) throws Exception{testForkJoinPool();TimeUnit.SECONDS.sleep(1);}
}
例子代码2
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;class MyTask implements Runnable {private final int taskId;public MyTask(int taskId) {this.taskId = taskId;}@Overridepublic void run() {try{TimeUnit.MILLISECONDS.sleep(300);// 线程0执行的任务慢一点,可以被窃取if (taskId % 4 == 0) {TimeUnit.MILLISECONDS.sleep(800);}}catch (Exception e){}System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName());}public int getTaskId() {return taskId;}
}public class Main {public static void main(String[] args) throws Exception {AtomicInteger taskIdGenerator = new AtomicInteger(0);ForkJoinPool forkjoinPool = new ForkJoinPool();// Add tasks to the shared queue (or directly to worker queues for simplicity in this example)for (int i = 0; i < 20; i++) {int taskId = taskIdGenerator.incrementAndGet();MyTask task = new MyTask(taskId);forkjoinPool.submit(task);}while (true){}}}
可以看到有窃取行为
相关文章:

work-stealing算法 ForkJoinPool
专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162 本文目标: 重点是通过例子程序理解work-stealing算法原理 目录 work-stealing算法算法原理和优缺点介绍使用场景work-stealing例子代码 ForkJoinPoolnew ForkJoinPool(…...

DeepSeek Janus-Pro:多模态AI模型的突破与创新
近年来,人工智能领域取得了显著的进展,尤其是在多模态模型(Multimodal Models)方面。多模态模型能够同时处理和理解文本、图像等多种类型的数据,极大地扩展了AI的应用场景。DeepSeek(DeepSeek-V3 深度剖析:…...

STM32-时钟树
STM32-时钟树 时钟 时钟...

hot100_21. 合并两个有序链表
将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1: 输入:l1 [1,2,4], l2 [1,3,4] 输出:[1,1,2,3,4,4] 示例 2: 输入:l1 [], l2 [] 输出:[…...

代码讲解系列-CV(一)——CV基础框架
文章目录 一、环境配置IDE选择一套完整复现安装自定义cuda算子 二、Linux基础文件和目录操作查看显卡状态压缩和解压 三、常用工具和pipeline远程文件工具版本管理代码辅助工具 随手记录下一个晚课 一、环境配置 pytorch是AI框架用的很多,或者 其他是国内的框架 an…...

C++ Primer 标准库类型string
欢迎阅读我的 【CPrimer】专栏 专栏简介:本专栏主要面向C初学者,解释C的一些基本概念和基础语言特性,涉及C标准库的用法,面向对象特性,泛型特性高级用法。通过使用标准库中定义的抽象设施,使你更加适应高级…...

计算机网络安全与运维的关键 —— 常用端口全解析
目录 前言 常见端口分类及用途 20 端口(FTP 数据传输) 21 端口(FTP 消息控制) 22 端口(SSH) 23 端口(Telnet) 25 端口(SMTP) 53 端口(DNS&…...

Vue.js 的介绍与组件开发初步
Vue.js 的介绍与组件开发初步 Vue.js 的介绍与组件开发初步引言第一部分:Vue.js 基础入门1.1 什么是 Vue.js?1.2 搭建 Vue.js 开发环境安装 Node.js 和 npm安装 Vue CLI创建新项目运行示例 1.3 第一个 Vue.js 示例 第二部分:Vue.js 组件开发基…...

【仿12306项目】通过加“锁”,解决高并发抢票的超卖问题
文章目录 一. 测试工具二. 超卖现象演示三. 原因分析四. 解决办法方法一:加synchronized锁1. 单个服务节点情况2. 增加服务器节点,分布式环境synchronized失效演示 方法二:使用Redis分布式锁锁解决超卖问题1. 添加Redis分布式锁2. 结果 方法三…...

wow-agent---task4 MetaGPT初体验
先说坑: 1.使用git clone模式安装metagpt 2.模型尽量使用在线模型或本地高参数模型。 这里使用python3.10.11调试成功 一,安装 安装 | MetaGPT,参考这里的以开发模型进行安装 git clone https://github.com/geekan/MetaGPT.git cd /you…...

MVANet——小范围内捕捉高分辨率细节而在大范围内不损失精度的强大的背景消除模型
一、概述 前景提取(背景去除)是现代计算机视觉的关键挑战之一,在各种应用中的重要性与日俱增。在图像编辑和视频制作中有效地去除背景不仅能提高美学价值,还能提高工作流程的效率。在要求精确度的领域,如医学图像分析…...

94,【2】buuctf web [安洵杯 2019]easy_serialize_php
进入靶场 可以查看源代码 <?php // 从 GET 请求中获取名为 f 的参数值,并赋值给变量 $function // 符号用于抑制可能出现的错误信息 $function $_GET[f];// 定义一个名为 filter 的函数,用于过滤字符串中的敏感词汇 function filter($img) {// 定义…...

LabVIEW如何有效地进行数据采集?
数据采集(DAQ)是许多工程项目中的核心环节,无论是测试、监控还是控制系统,准确、高效的数据采集都是至关重要的。LabVIEW作为一个图形化编程环境,提供了丰富的功能来实现数据采集,确保数据的实时性与可靠性…...

6 [新一代Github投毒针对网络安全人员钓鱼]
0x01 前言 在Github上APT组织“海莲花”发布存在后门的提权BOF,通过该项目针对网络安全从业人员进行钓鱼。不过其实早在几年前就已经有人对Visual Studio项目恶意利用进行过研究,所以投毒的手法也不算是新的技术。但这次国内有大量的安全从业者转发该钓…...

《Origin画百图》之脊线图
1.数据准备:将数据设置为y 2.选择绘图>统计图>脊线图 3.生成基础图形,并不好看,接下来对图形属性进行设置 4.双击图形>选择图案>颜色选择按点>Y值 5.这里发现颜色有色阶,过度并不平滑,需要对色阶进行更…...

linux 函数 sem_init () 信号量、sem_destroy()
(1) (2) 代码举例: #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <semaphore.h> #include <unistd.h>sem_t semaphore;void* thread_function(void* arg) …...

Kafka架构
引言 Kafka 凭借其独树一帜的分区架构,在消息中间件领域展现出了卓越的性能表现。其分区架构不仅赋予了 Kafka 强大的并行计算能力,使其能够高效处理海量数据,还显著提升了系统的容灾能力,确保在复杂的运行环境中始终保持稳定可靠…...

刷题记录 动态规划-2: 509. 斐波那契数
题目:509. 斐波那契数 难度:简单 斐波那契数 (通常用 F(n) 表示)形成的序列称为 斐波那契数列 。该数列由 0 和 1 开始,后面的每一项数字都是前面两项数字的和。也就是: F(0) 0,F(1) 1 F(n…...

RDP协议详解
以下内容包含对 RDP(Remote Desktop Protocol,远程桌面协议)及其开源实现 FreeRDP 的较为系统、深入的讲解,涵盖协议概要、历史沿革、核心原理、安全机制、安装与使用方法、扩展与未来发展趋势等方面, --- ## 一、引…...

设计模式的艺术-观察者模式
行为型模式的名称、定义、学习难度和使用频率如下表所示: 1.如何理解观察者模式 一个对象的状态或行为的变化将导致其他对象的状态或行为也发生改变,它们之间将产生联动,正所谓“触一而牵百发”。为了更好地描述对象之间存在的这种一对多&…...

【C语言设计模式学习笔记1】面向接口编程/简单工厂模式/多态
面向接口编程可以提供更高级的抽象,实现的时候,外部不需要知道内部的具体实现,最简单的是使用简单工厂模式来进行实现,比如一个Sensor具有多种表示形式,这时候可以在给Sensor结构体添加一个enum类型的type,…...

Baklib如何优化企业知识管理提升团队协作与创新能力分析
内容概要 在现代企业中,知识管理已经成为提升竞争力的关键因素之一。Baklib作为一种全面的知识管理解决方案,致力于帮助企业高效整合和运用内部及外部知识资源。它通过建立统一的知识管理框架,打破了部门之间的信息壁垒,实现了跨…...

Dubbo view
1、 说说Dubbo核心的配置有哪些? 答: 配置 配置说明 dubbo:service 服务配置 dubbo:reference 引用配置 dubbo:protocol 协议配置 dubbo:application 应用配置 dubbo:module 模块配置 dubbo:registry 注册中心配置 dubbo:monitor 监控中心配置 dubbo:pr…...

分享刷题过程中有价值的两道题目
小编在这里先祝大家新的一年里所愿皆得,万事顺意,天天开心!!! 一.水仙花数 题目描述: 求100∼999中的水仙花数。若三位数ABCA^3B^3C^3,则称ABC为水仙花数。例如153,135333112527153&…...

蓝桥杯例题六
奋斗是一种态度,也是一种生活方式。无论我们面对什么样的困难和挑战,只要心怀梦想,坚持不懈地努力,就一定能够迈向成功的道路。每一次失败都是一次宝贵的经验,每一次挫折都是一次锻炼的机会。在困难面前,我…...

DeepSeek 详细使用教程
1. 简介 DeepSeek 是一款基于人工智能技术的多功能工具,旨在帮助用户高效处理和分析数据、生成内容、解答问题、进行语言翻译等。无论是学术研究、商业分析还是日常使用,DeepSeek 都能提供强大的支持。本教程将详细介绍 DeepSeek 的各项功能及使用方法。…...

《tcp/ip协议详解》,tcp/ip协议详解
TCP/IP协议(Transmission Control Protocol/Internet Protocol)是网络通信协议的一种,也被称为“Internet协议”,是Internet上运行的基本协议,广泛应用于各种网络环境和应用场合。以下是对TCP/IP协议的详细解析&#x…...

游戏引擎 Unity - Unity 设置为简体中文、Unity 创建项目
Unity Unity 首次发布于 2005 年,属于 Unity Technologies Unity 使用的开发技术有:C# Unity 的适用平台:PC、主机、移动设备、VR / AR、Web 等 Unity 的适用领域:开发中等画质中小型项目 Unity 适合初学者或需要快速上手的开…...

【数据结构】_时间复杂度相关OJ(力扣版)
目录 1. 示例1:消失的数字 思路1:等差求和 思路2:异或运算 思路3:排序+二分查找 2. 示例2:轮转数组 思路1:逐次轮转 思路2:三段逆置(经典解法) 思路3…...

[Java]异常
在程序运行时,如果遇到问题(比如除以零、文件找不到等),程序会发生异常。异常就像是程序的“错误提醒”,当程序运行中出错时,它会停止,给出一个错误信息。我们可以通过异常处理来控制这些错误&a…...