并发包工具之 批量处理任务 CompletionService(异步)、CompletableFuture(回调)
文章目录
- 一、处理异步任务并获取返回值——CompletionService
- 二、线程池
- 三、Callable 与 Future
- 四、通过回调方式处理可组合编排任务——CompletableFuture
一、处理异步任务并获取返回值——CompletionService
特点描述:
对于比较复杂的计算,把任务进行提交,并发执行,哪个任务先执行完,get()方法就会获取到相应的任务结果。
范式:
1、
假设有一组针对某个问题的任务solvers(需要实现Callable接口,任务的具体逻辑就在其call方法里),每个任务都返回一个类型为Result的值,并且想要并发地运行它们,处理每个返回一个非空值的结果,在某些方法使用:
void solve(Executor e,Collection<Callable<Result>> solvers)throws InterruptedException, ExecutionException {CompletionService<Result> ecs= new ExecutorCompletionService<Result>(e);for (Callable<Result> s : solvers)ecs.submit(s);int n = solvers.size();for (int i = 0; i < n; ++i) {Result r = ecs.take().get();if (r != null)use(r);}}
2、
假设想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务(比如:多仓库文件/镜像下载,从最近的服务中心下载后,终止其他下载过程)
void solve(Executor e,Collection<Callable<Result>> solvers)throws InterruptedException {CompletionService<Result> ecs= new ExecutorCompletionService<Result>(e);int n = solvers.size();List<Future<Result>> futures= new ArrayList<Future<Result>>(n);Result result = null;try {for (Callable<Result> s : solvers)futures.add(ecs.submit(s));for (int i = 0; i < n; ++i) {try {Result r = ecs.take().get();if (r != null) {result = r;break;}} catch (ExecutionException ignore) {}}}finally {for (Future<Result> f : futures)// 注意这里的参数给的是 true,详解同样在前序 Future 源码分析文章中f.cancel(true);}if (result != null)use(result);}
总得来说分两步:
1、提交异步任务 submit方法(submit最终会委托给内部的 executor 去执行任务)
2、从队列中拿取并移除元素 take(如果队列为空,那么调用 take() 方法的线程会被阻塞)/poll(…不会被阻塞,返回null)/poll带超时参数(获取并移除阻塞队列中的第一个元素,如果超时时间到而队列还是空,该方法返回null) 方法
实现原理:
将异步任务的生产、任务完成结果的消费进行解耦,类似mq,哪个任务先执行完,就把结果放到队列中。
唯一实现类:
ExecutorCompletionService;阻塞队列默认是 LinkedBlockingQueue
二、线程池
为什么要用线程池
∵ 手动创建线程的缺点:
1、不受控,系统资源有限,每个人如果都创建的话,标准不一样,线程疯狂抢占资源.,混乱…
2、开销大,创建一个线程需要调用操作系统内核API,然后操作系统要为线程分配一系列资源,创建个线程啥也不干大概需要1M左右大小。
线程池可以统一管理、控制最大并发数并实现拒绝策略、隔离线程环境;当执行大量异步任务时,线程池里的线程能复用,不用频繁创建和销毁,能够提供好的性能。
Java并发包里的线程池——ThreadPoolExecutor; (接口是ExecutorService)
Spring对线程池的封装——ThreadPoolTaskExecutor
关于线程池核心线程数的设置:
CPU是时间片轮转机制来让线程占用的,也就是说程序表面上是同时进行的,实际上是切换执行的,CPU每个时刻只能由一个线程占用,比如 4核CPU,只能同时跑4个线程。
对于CPU密集型程序(如运算、逻辑判断等,I/O操作可以在短时间完成,但CPU运算比较多)
——最佳线程数量=CPU核数+1,这个1可以理解为替补,如果某个线程因为发生错误或其他原因暂停了,这个线程可以继续工作。
对于I/O密集型(如涉及网络、磁盘、内存等)
——最佳线程数=CPU核心数 * (1/CPU利用率)=CPU核心数 * (1 + (I/O耗时/CPU耗时)),如果几乎都是I/O耗时,可取2N+1(1为替补)
(p.s.线程数不是越多越好,线程上下文切换开销不小)
三、Callable 与 Future
Runnable接口的方法没有返回值;Callable 是泛型接口,可以返回指定类型的结果。
当提交一个Callable 任务后,会同时获得一个Future对象,然后,在主线程某个时刻调用Future对象的get() 方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。
四、通过回调方式处理可组合编排任务——CompletableFuture
特点描述:
CompletableFuture是由 Java 8 引入的,在 Java 8之 前一般通过 Future 实现异步,CompletableFuture 对 Future 进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,比如步骤1、2、3存在依赖关系,支持对步骤进一步的编排,降低依赖之间的阻塞。
使用:
如上图所示,这里描绘的是一个业务接口的流程,其中包括 CF1\CF2\CF3\CF4\CF5 共5个步骤,并描绘了这些步骤之间的依赖关系,每个步骤可以是一次 RPC 调用、一次数据库操作或者是一次本地方法调用等,在使用 CompletableFuture 进行异步化编程时,图中的每个步骤都会产生一个 CompletableFuture 对象,最终结果也会用一个 CompletableFuture 来进行表示。(只看第一层的话 好像跟 CompletionService 效果差不多… 都可以异步执行批量任务并拿到结果…)
1、零依赖,CompletableFuture 的创建
比如图中所示的 CF1、CF2,可以有以下方式:
// 1、使用 runAsync 或 supplyAsync 发起异步调用// 线程池ExecutorService executorService = Executors.newFixedThreadPool(5);CompletableFuture<String> CF1 = CompletableFuture.supplyAsync(() -> {return "CF1 result";}, executorService);// 2、CompletableFuture.completedFuture() 直接创建一个已完成状态的 CompletableFuture<CompletableFuture<String> CF2 = CompletableFuture.completedFuture("CF2 result");// 3、先初始化一个未完成的 CompletableFutureCompletableFuture<String> CF3 = new CompletableFuture<>();// 然后通过complete()、completeExceptionally(),完成该CompletableFutureCF3.complete("CF3result");
2、一元依赖,依赖一个 CompletableFuture
比如图中所示的 CF3、CF5,可以用 thenApply、thenAccept、thenCompose 等方法来实现:
// result为CF1的结果CompletableFuture<String> CF3=CF1.thenApply(result->{return "CF3result";});
3、二元依赖:依赖两个 CompletableFuture
比如图中所示的 CF4,这种二元依赖可以通过 thenCombine 等回调来实现:
// result1、result2分别为CF1、CF2的结果CompletableFuture<String> CF4 = CF1.thenCombine(CF2, (result1, result2) -> {return "CF4result";});
4、多元依赖,依赖多个 CompletableFuture
比如图中所示的 CF6,依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过 allOf 或 anyOf 方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf:
CompletableFuture<Void> CF6 = CompletableFuture.allOf(CF3, CF4, CF5);CompletableFuture<String> result = CF6.thenApply(v ->{// 这里的 join是完成任务后用来获取结果的,并不会阻塞// 因为传给 thenApply 的函数都是在 CF3、CF4、CF5 全都完成时才会执行String result3 = CF3.join();String result4 = CF4.join();String result5 = CF5.join();// 根据 result3、result4、result5组装最终 resultreturn result3 + result4 + result5;});
如果只用一层的话,异步执行批量任务并拿到总的结果,参考api里 allOf:
代码示例:
// 任务入参集合ArrayList<String> paramList = new ArrayList<>();// 用于汇总所有结果ArrayList<String> resultList = new ArrayList<>();CompletableFuture.allOf(paramList.stream().map(string ->CompletableFuture.supplyAsync(() ->// 这里返回了本身,实际上也可以是具体的方法string,asyncServiceExecutor)// thenApply是对结果做简单映射,类似于Stream.map,list->list就是原样往下传递,这里不使用thenApply也行.thenApply(list -> list).whenComplete((result, e) -> {// 对异常结果的处理if (e != null) System.out.println("exception");// 汇总结果resultList.add(result);})).toArray(CompletableFuture[]::new)// 完成后返回结果值,如果异常完成则抛出(未经检查)异常,相当于一个等待任务完成的动作).join();
参考文档:
https://dayarch.top/p/how-many-threads-should-be-created.html
https://segmentfault.com/a/1190000023129592?utm_source=sf-similar-article
https://segmentfault.com/a/1190000023587881
https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html
相关文章:

并发包工具之 批量处理任务 CompletionService(异步)、CompletableFuture(回调)
文章目录一、处理异步任务并获取返回值——CompletionService二、线程池三、Callable 与 Future四、通过回调方式处理可组合编排任务——CompletableFuture一、处理异步任务并获取返回值——CompletionService 特点描述: 对于比较复杂的计算,把…...

验收测试分类
α测试 Alpha 是内测版本,即现在所说的CB。 此版本表示该软件仅仅是一个初步完成品, 通常只在软件开发者内部交流, 也有很少一部分发布给专业测试人员。 一般而言, 该版本软件的bug 较多, 普通用户最好不要安装。 β测试 Beta是公测版本,是对所有用户…...

因新硬件支持内核问题Ubuntu 22.04.2推迟发布
导读Ubuntu 22.04.2 LTS 原定于 2 月 9 日发布。但 Canonical 宣布该版本因各种问题不得不推迟两周,定于 2 月 23 日发布。 Ubuntu 22.04.2 LTS 原定于 2 月 9 日发布。但 Canonical 宣布该版本因各种问题不得不推迟两周,定于 2 月 23 日发布。 Canonica…...

agent扩展-自定义外部加载路径
自定义classLoader实现加载外部jar, 以skywalking agent 类加载器为例子 整体思路 扩展findClass ,解决loadClass可以查找到扩展findResource,解决getResources可以获取到资源 基本原理 ClassLoader loadClass的加载顺序 findLoadedClass 加载本地已经…...
Elasticsearch使用篇 - 指标聚合
指标聚合 指标聚合从聚合文档中提取出指标进行计算。可以从文档的字段或者使用脚本方式进行提取。 聚合统计可以同时返回明细数据,可以分页查询,可以返回总数量。 可以结合查询条件,限制数据范围,结合倒排索引列式存储。 指标…...
Python生命周期及内存管理
文章目录 一、Python的生命周期 1、概念2、如何监听生命周期二、内存管理 1.存储2.垃圾回收3.引用计数一、生命周期: 1、概念:一个对象从创建到消亡的过程 当一个对象呗创建是,会在内存中分配响应的内存空间进行存储 当这个对象不再使…...

Elasticsearch7.8.0版本进阶——数据写流程
目录一、数据写流程概述二、数据写流程步骤2.1、数据写流程图2.2、数据写流程步骤(新建索引和删除文档所需要的步骤顺序)2.3、数据写流程的请求参数一、数据写流程概述 新建、删除索引和新建、删除文档的请求都是写操作, 必须在主分片上面完…...

化学试剂Glutaric Acid-PEG-Glutaric Acid,GA-PEG-GA,戊二酸-聚乙二醇-戊二酸
一:产品描述 1、名称 英文:Glutaric Acid-PEG-Glutaric Acid,GA-PEG-GA 中文:戊二酸-聚乙二醇-戊二酸 2、CAS编号:N/A 3、所属分类:Carboxylic acid PEG 4、分子量:可定制, 戊…...

知识图谱业务落地技术推荐之国内知识图谱平台汇总(竞品)[阿里、腾讯、华为等】
各位可以参考国内知识图谱平台产品进行对技术链路搭建和产品参考提供借鉴。...

ABC 289 G - Shopping in AtCoder store 数学推导+凸包
大意: n个顾客,每个人有一个购买的欲望bi,m件物品,每一件物品有一个价值ci,每一个顾客会买商品当且仅当bici>定价. 现在要求对每一个商品定价,求出它的最大销售值(数量*定价) n,m<2e5 思路&#x…...
ARM Linux 如何在sysfs用户态命令行中控制 GPIO 引脚?
ARM Linux 如何在sysfs用户态命令行中控制 GPIO 引脚?我们在开发工作中,经常需要确定内核gpio驱动,是否有异常,或者在没有应用的情况下,像控制某个外设,这时我们就可以在控制台命令行中,用命令导…...

【Linux】生产者消费者模型 - 详解
目录 一.生产者消费者模型概念 1.为何要使用生产者消费者模型 2.生产者消费者之间的关系 3.生产者消费者模型的优点 二.基于阻塞队列的生产消费模型 1.在阻塞队列中的三种关系 2.BlockingQueue.hpp - 阻塞队列类 3.LockGurad.hpp - RAII互斥锁类 4.Task.hpp - 在阻塞队…...

源码深度解析Spring Bean的加载
在应用spring 的过程中,就会涉及到bean的加载,bean的加载经历一个相当复杂的过程,bean的加载入口如下: 使用getBean()方法进行加载Bean,最终调用的是AbstractBeanFactory.doGetBean() 进行Bean的…...

STL——priority_queue
一、priority_queue介绍及使用 1.priority_queue文档介绍 (1)优先队列是一种容器适配器,根据严格的弱排序标准,它的第一个元素总是它所包含的元素中最大的。 (2)此上下文类似与堆,在堆中可以…...

Springboot集成工作流Activity
介绍 官网:https://www.activiti.org/ 一 、工作流介绍 1.工作流(workflow) 就是通过计算机对业务流程自动化执行管理,它主要解决的是“使在多个参与这之间按照某种预定义规则自动化进行传递文档、信息或任务的过程,…...

2023软件测试工程师涨薪攻略,3年如何达到月薪30K?
1.软件测试如何实现涨薪 首先涨薪并不是从8000涨到9000这种涨薪,而是从8000涨到15K加到25K的涨薪。基本上三年之内就可以实现。 如果我们只是普通的有应届毕业生或者是普通本科那我们就只能从小公司开始慢慢往上走。 有些同学想去做测试,是希望能够日…...

Java面试——Spring Bean相关知识
目录 1.Bean的定义 2.Bean的生命周期 3.BeanFactory及Factory Bean 4.Bean的作用域 5.Bean的线程安全问题 1.Bean的定义 JavaBean是描述Java的软件组件模型。在Java模型中,通过JavaBean可以无限扩充Java程序的功能,通过JavaBean的组合可以快速的生…...

上班在群里摸鱼,逮到一个字节8年测试开发,聊过之后羞愧难当...
老话说的好,这人呐,一旦在某个领域鲜有敌手了,就会闲得某疼。前几天我在上班摸鱼刷群的时候认识了一位字节测试开发大佬,在字节工作了8年,因为本人天赋比较高,平时工作也兢兢业业,现在企业内有一…...

HTTP、WebSocket和Socket.IO
一、HTTP协议 HTTP协议是Hyper Text Transfer Protocol(超文本传输协议)。HTTP 协议和 TCP/IP 协议族内的其他众多的协议相同, 用于客户端和服务器之间的通信。请求访问文本或图像等资源的一端称为客户端, 而提供资源响应的一端称…...
Fluent Python 笔记 第 11 章 接口:从协议到抽象基类
本章讨论的话题是接口:从鸭子类型的代表特征动态协议,到使接口更明确、能验证实现是否符合规定的抽象基类(Abstract Base Class,ABC)。 11.1 Python 文化中的接口和协议 对 Python 程序员来说,“X 类对象”“X 协 议”和“X 接口”都是一个…...

C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

Opencv中的addweighted函数
一.addweighted函数作用 addweighted()是OpenCV库中用于图像处理的函数,主要功能是将两个输入图像(尺寸和类型相同)按照指定的权重进行加权叠加(图像融合),并添加一个标量值&#x…...
c++ 面试题(1)-----深度优先搜索(DFS)实现
操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...
Matlab | matlab常用命令总结
常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...

Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...

回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...