并发包工具之 批量处理任务 CompletionService(异步)、CompletableFuture(回调)
文章目录
- 一、处理异步任务并获取返回值——CompletionService
- 二、线程池
- 三、Callable 与 Future
- 四、通过回调方式处理可组合编排任务——CompletableFuture
一、处理异步任务并获取返回值——CompletionService
特点描述:
对于比较复杂的计算,把任务进行提交,并发执行,哪个任务先执行完,get()方法就会获取到相应的任务结果。
范式:
1、
假设有一组针对某个问题的任务solvers(需要实现Callable接口,任务的具体逻辑就在其call方法里),每个任务都返回一个类型为Result的值,并且想要并发地运行它们,处理每个返回一个非空值的结果,在某些方法使用:
void solve(Executor e,Collection<Callable<Result>> solvers)throws InterruptedException, ExecutionException {CompletionService<Result> ecs= new ExecutorCompletionService<Result>(e);for (Callable<Result> s : solvers)ecs.submit(s);int n = solvers.size();for (int i = 0; i < n; ++i) {Result r = ecs.take().get();if (r != null)use(r);}}
2、
假设想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务(比如:多仓库文件/镜像下载,从最近的服务中心下载后,终止其他下载过程)
void solve(Executor e,Collection<Callable<Result>> solvers)throws InterruptedException {CompletionService<Result> ecs= new ExecutorCompletionService<Result>(e);int n = solvers.size();List<Future<Result>> futures= new ArrayList<Future<Result>>(n);Result result = null;try {for (Callable<Result> s : solvers)futures.add(ecs.submit(s));for (int i = 0; i < n; ++i) {try {Result r = ecs.take().get();if (r != null) {result = r;break;}} catch (ExecutionException ignore) {}}}finally {for (Future<Result> f : futures)// 注意这里的参数给的是 true,详解同样在前序 Future 源码分析文章中f.cancel(true);}if (result != null)use(result);}
总得来说分两步:
1、提交异步任务 submit方法(submit最终会委托给内部的 executor 去执行任务)
2、从队列中拿取并移除元素 take(如果队列为空,那么调用 take() 方法的线程会被阻塞)/poll(…不会被阻塞,返回null)/poll带超时参数(获取并移除阻塞队列中的第一个元素,如果超时时间到而队列还是空,该方法返回null) 方法
实现原理:
将异步任务的生产、任务完成结果的消费进行解耦,类似mq,哪个任务先执行完,就把结果放到队列中。
唯一实现类:
ExecutorCompletionService;阻塞队列默认是 LinkedBlockingQueue
二、线程池
为什么要用线程池
∵ 手动创建线程的缺点:
1、不受控,系统资源有限,每个人如果都创建的话,标准不一样,线程疯狂抢占资源.,混乱…
2、开销大,创建一个线程需要调用操作系统内核API,然后操作系统要为线程分配一系列资源,创建个线程啥也不干大概需要1M左右大小。
线程池可以统一管理、控制最大并发数并实现拒绝策略、隔离线程环境;当执行大量异步任务时,线程池里的线程能复用,不用频繁创建和销毁,能够提供好的性能。
Java并发包里的线程池——ThreadPoolExecutor; (接口是ExecutorService)
Spring对线程池的封装——ThreadPoolTaskExecutor
关于线程池核心线程数的设置:
CPU是时间片轮转机制来让线程占用的,也就是说程序表面上是同时进行的,实际上是切换执行的,CPU每个时刻只能由一个线程占用,比如 4核CPU,只能同时跑4个线程。
对于CPU密集型程序(如运算、逻辑判断等,I/O操作可以在短时间完成,但CPU运算比较多)
——最佳线程数量=CPU核数+1,这个1可以理解为替补,如果某个线程因为发生错误或其他原因暂停了,这个线程可以继续工作。
对于I/O密集型(如涉及网络、磁盘、内存等)
——最佳线程数=CPU核心数 * (1/CPU利用率)=CPU核心数 * (1 + (I/O耗时/CPU耗时)),如果几乎都是I/O耗时,可取2N+1(1为替补)
(p.s.线程数不是越多越好,线程上下文切换开销不小)
三、Callable 与 Future
Runnable接口的方法没有返回值;Callable 是泛型接口,可以返回指定类型的结果。
当提交一个Callable 任务后,会同时获得一个Future对象,然后,在主线程某个时刻调用Future对象的get() 方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。
四、通过回调方式处理可组合编排任务——CompletableFuture
特点描述:
CompletableFuture是由 Java 8 引入的,在 Java 8之 前一般通过 Future 实现异步,CompletableFuture 对 Future 进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,比如步骤1、2、3存在依赖关系,支持对步骤进一步的编排,降低依赖之间的阻塞。
使用:
如上图所示,这里描绘的是一个业务接口的流程,其中包括 CF1\CF2\CF3\CF4\CF5 共5个步骤,并描绘了这些步骤之间的依赖关系,每个步骤可以是一次 RPC 调用、一次数据库操作或者是一次本地方法调用等,在使用 CompletableFuture 进行异步化编程时,图中的每个步骤都会产生一个 CompletableFuture 对象,最终结果也会用一个 CompletableFuture 来进行表示。(只看第一层的话 好像跟 CompletionService 效果差不多… 都可以异步执行批量任务并拿到结果…)
1、零依赖,CompletableFuture 的创建
比如图中所示的 CF1、CF2,可以有以下方式:
// 1、使用 runAsync 或 supplyAsync 发起异步调用// 线程池ExecutorService executorService = Executors.newFixedThreadPool(5);CompletableFuture<String> CF1 = CompletableFuture.supplyAsync(() -> {return "CF1 result";}, executorService);// 2、CompletableFuture.completedFuture() 直接创建一个已完成状态的 CompletableFuture<CompletableFuture<String> CF2 = CompletableFuture.completedFuture("CF2 result");// 3、先初始化一个未完成的 CompletableFutureCompletableFuture<String> CF3 = new CompletableFuture<>();// 然后通过complete()、completeExceptionally(),完成该CompletableFutureCF3.complete("CF3result");
2、一元依赖,依赖一个 CompletableFuture
比如图中所示的 CF3、CF5,可以用 thenApply、thenAccept、thenCompose 等方法来实现:
// result为CF1的结果CompletableFuture<String> CF3=CF1.thenApply(result->{return "CF3result";});
3、二元依赖:依赖两个 CompletableFuture
比如图中所示的 CF4,这种二元依赖可以通过 thenCombine 等回调来实现:
// result1、result2分别为CF1、CF2的结果CompletableFuture<String> CF4 = CF1.thenCombine(CF2, (result1, result2) -> {return "CF4result";});
4、多元依赖,依赖多个 CompletableFuture
比如图中所示的 CF6,依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过 allOf 或 anyOf 方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf:
CompletableFuture<Void> CF6 = CompletableFuture.allOf(CF3, CF4, CF5);CompletableFuture<String> result = CF6.thenApply(v ->{// 这里的 join是完成任务后用来获取结果的,并不会阻塞// 因为传给 thenApply 的函数都是在 CF3、CF4、CF5 全都完成时才会执行String result3 = CF3.join();String result4 = CF4.join();String result5 = CF5.join();// 根据 result3、result4、result5组装最终 resultreturn result3 + result4 + result5;});
如果只用一层的话,异步执行批量任务并拿到总的结果,参考api里 allOf:
代码示例:
// 任务入参集合ArrayList<String> paramList = new ArrayList<>();// 用于汇总所有结果ArrayList<String> resultList = new ArrayList<>();CompletableFuture.allOf(paramList.stream().map(string ->CompletableFuture.supplyAsync(() ->// 这里返回了本身,实际上也可以是具体的方法string,asyncServiceExecutor)// thenApply是对结果做简单映射,类似于Stream.map,list->list就是原样往下传递,这里不使用thenApply也行.thenApply(list -> list).whenComplete((result, e) -> {// 对异常结果的处理if (e != null) System.out.println("exception");// 汇总结果resultList.add(result);})).toArray(CompletableFuture[]::new)// 完成后返回结果值,如果异常完成则抛出(未经检查)异常,相当于一个等待任务完成的动作).join();
参考文档:
https://dayarch.top/p/how-many-threads-should-be-created.html
https://segmentfault.com/a/1190000023129592?utm_source=sf-similar-article
https://segmentfault.com/a/1190000023587881
https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html
相关文章:
并发包工具之 批量处理任务 CompletionService(异步)、CompletableFuture(回调)
文章目录一、处理异步任务并获取返回值——CompletionService二、线程池三、Callable 与 Future四、通过回调方式处理可组合编排任务——CompletableFuture一、处理异步任务并获取返回值——CompletionService 特点描述: 对于比较复杂的计算,把…...
验收测试分类
α测试 Alpha 是内测版本,即现在所说的CB。 此版本表示该软件仅仅是一个初步完成品, 通常只在软件开发者内部交流, 也有很少一部分发布给专业测试人员。 一般而言, 该版本软件的bug 较多, 普通用户最好不要安装。 β测试 Beta是公测版本,是对所有用户…...
因新硬件支持内核问题Ubuntu 22.04.2推迟发布
导读Ubuntu 22.04.2 LTS 原定于 2 月 9 日发布。但 Canonical 宣布该版本因各种问题不得不推迟两周,定于 2 月 23 日发布。 Ubuntu 22.04.2 LTS 原定于 2 月 9 日发布。但 Canonical 宣布该版本因各种问题不得不推迟两周,定于 2 月 23 日发布。 Canonica…...
agent扩展-自定义外部加载路径
自定义classLoader实现加载外部jar, 以skywalking agent 类加载器为例子 整体思路 扩展findClass ,解决loadClass可以查找到扩展findResource,解决getResources可以获取到资源 基本原理 ClassLoader loadClass的加载顺序 findLoadedClass 加载本地已经…...
Elasticsearch使用篇 - 指标聚合
指标聚合 指标聚合从聚合文档中提取出指标进行计算。可以从文档的字段或者使用脚本方式进行提取。 聚合统计可以同时返回明细数据,可以分页查询,可以返回总数量。 可以结合查询条件,限制数据范围,结合倒排索引列式存储。 指标…...
Python生命周期及内存管理
文章目录 一、Python的生命周期 1、概念2、如何监听生命周期二、内存管理 1.存储2.垃圾回收3.引用计数一、生命周期: 1、概念:一个对象从创建到消亡的过程 当一个对象呗创建是,会在内存中分配响应的内存空间进行存储 当这个对象不再使…...
Elasticsearch7.8.0版本进阶——数据写流程
目录一、数据写流程概述二、数据写流程步骤2.1、数据写流程图2.2、数据写流程步骤(新建索引和删除文档所需要的步骤顺序)2.3、数据写流程的请求参数一、数据写流程概述 新建、删除索引和新建、删除文档的请求都是写操作, 必须在主分片上面完…...
化学试剂Glutaric Acid-PEG-Glutaric Acid,GA-PEG-GA,戊二酸-聚乙二醇-戊二酸
一:产品描述 1、名称 英文:Glutaric Acid-PEG-Glutaric Acid,GA-PEG-GA 中文:戊二酸-聚乙二醇-戊二酸 2、CAS编号:N/A 3、所属分类:Carboxylic acid PEG 4、分子量:可定制, 戊…...
知识图谱业务落地技术推荐之国内知识图谱平台汇总(竞品)[阿里、腾讯、华为等】
各位可以参考国内知识图谱平台产品进行对技术链路搭建和产品参考提供借鉴。...
ABC 289 G - Shopping in AtCoder store 数学推导+凸包
大意: n个顾客,每个人有一个购买的欲望bi,m件物品,每一件物品有一个价值ci,每一个顾客会买商品当且仅当bici>定价. 现在要求对每一个商品定价,求出它的最大销售值(数量*定价) n,m<2e5 思路&#x…...
ARM Linux 如何在sysfs用户态命令行中控制 GPIO 引脚?
ARM Linux 如何在sysfs用户态命令行中控制 GPIO 引脚?我们在开发工作中,经常需要确定内核gpio驱动,是否有异常,或者在没有应用的情况下,像控制某个外设,这时我们就可以在控制台命令行中,用命令导…...
【Linux】生产者消费者模型 - 详解
目录 一.生产者消费者模型概念 1.为何要使用生产者消费者模型 2.生产者消费者之间的关系 3.生产者消费者模型的优点 二.基于阻塞队列的生产消费模型 1.在阻塞队列中的三种关系 2.BlockingQueue.hpp - 阻塞队列类 3.LockGurad.hpp - RAII互斥锁类 4.Task.hpp - 在阻塞队…...
源码深度解析Spring Bean的加载
在应用spring 的过程中,就会涉及到bean的加载,bean的加载经历一个相当复杂的过程,bean的加载入口如下: 使用getBean()方法进行加载Bean,最终调用的是AbstractBeanFactory.doGetBean() 进行Bean的…...
STL——priority_queue
一、priority_queue介绍及使用 1.priority_queue文档介绍 (1)优先队列是一种容器适配器,根据严格的弱排序标准,它的第一个元素总是它所包含的元素中最大的。 (2)此上下文类似与堆,在堆中可以…...
Springboot集成工作流Activity
介绍 官网:https://www.activiti.org/ 一 、工作流介绍 1.工作流(workflow) 就是通过计算机对业务流程自动化执行管理,它主要解决的是“使在多个参与这之间按照某种预定义规则自动化进行传递文档、信息或任务的过程,…...
2023软件测试工程师涨薪攻略,3年如何达到月薪30K?
1.软件测试如何实现涨薪 首先涨薪并不是从8000涨到9000这种涨薪,而是从8000涨到15K加到25K的涨薪。基本上三年之内就可以实现。 如果我们只是普通的有应届毕业生或者是普通本科那我们就只能从小公司开始慢慢往上走。 有些同学想去做测试,是希望能够日…...
Java面试——Spring Bean相关知识
目录 1.Bean的定义 2.Bean的生命周期 3.BeanFactory及Factory Bean 4.Bean的作用域 5.Bean的线程安全问题 1.Bean的定义 JavaBean是描述Java的软件组件模型。在Java模型中,通过JavaBean可以无限扩充Java程序的功能,通过JavaBean的组合可以快速的生…...
上班在群里摸鱼,逮到一个字节8年测试开发,聊过之后羞愧难当...
老话说的好,这人呐,一旦在某个领域鲜有敌手了,就会闲得某疼。前几天我在上班摸鱼刷群的时候认识了一位字节测试开发大佬,在字节工作了8年,因为本人天赋比较高,平时工作也兢兢业业,现在企业内有一…...
HTTP、WebSocket和Socket.IO
一、HTTP协议 HTTP协议是Hyper Text Transfer Protocol(超文本传输协议)。HTTP 协议和 TCP/IP 协议族内的其他众多的协议相同, 用于客户端和服务器之间的通信。请求访问文本或图像等资源的一端称为客户端, 而提供资源响应的一端称…...
Fluent Python 笔记 第 11 章 接口:从协议到抽象基类
本章讨论的话题是接口:从鸭子类型的代表特征动态协议,到使接口更明确、能验证实现是否符合规定的抽象基类(Abstract Base Class,ABC)。 11.1 Python 文化中的接口和协议 对 Python 程序员来说,“X 类对象”“X 协 议”和“X 接口”都是一个…...
【Spark分布式内存计算框架——Spark Core】11. Spark 内核调度(下)
8.5 Spark 基本概念 Spark Application运行时,涵盖很多概念,主要如下表格: 官方文档:http://spark.apache.org/docs/2.4.5/cluster-overview.html#glossary Application:指的是用户编写的Spark应用程序/代码&#x…...
Java中的函数
1.String.trim() : 主要有2个用法: 1、就是去掉字符串中前后的空白;这个方法的主要可以使用在判断用户输入的密码之类的。 2、它不仅可以去除空白,还可以去除字符串中的制表符,如 ‘\t’,\n等。 2.Integer.parseInt() : 字符串…...
实验6-霍纳法则及变治技术
目录 1.霍纳法则(Horners rule) 2.堆排序 3.求a的n次幂 1.霍纳法则(Horners rule) 【问题描述】用霍纳法则求一个多项式在一个给定点的值 【输入形式】输入三行,第一行是一个整数n,表示的是多项式的最高次数;第二行多项式的系数组P[0...n](从低到高存储);第三行是…...
IP地址:揭晓安欣警官自证清白的黑科技
《狂飙》这部电视剧,此从播出以来可谓是火爆了,想必大家都是看过的。剧中,主人公“安欣”是一名警察。一直在与犯罪分子做斗争。 莽村的李顺案中,有匿名者这个案件在网上发帖恶意造谣,说安欣是黑恶势力的保护伞&#…...
考研复试机试 | C++
目录1.盛水最多的容器<11>题目代码:2.整数转罗马数字题目:代码:3. 清华大学机试题 abc题目题解4.清华大学机试题 反序数题目描述代码对称平方数题目代码:5. 杭电上机题 叠筐题目:代码pass:关于清华大…...
第四章.误差反向传播法—误差反向传播法实现手写数字识别神经网络
第四章.误差反向传播法 4.3 误差反向传播法实现手写数字识别神经网络 通过像组装乐高积木一样组装第四章中实现的层,来构建神经网络。 1.神经网络学习全貌图 1).前提: 神经网络存在合适的权重和偏置,调整权重和偏置以便拟合训练数据的过程称…...
IB学习者的培养目标有哪些?
IB课程强调要培养年轻人的探究精神,在富有渊博知识的同时,更要勤于思考,敢于思考,尊重和理解跨文化的差异,坚持原则维护公平,让这个世界充满爱与和平,让这个世界变得更加美好。上一次我们为大家…...
C++类基础(十三)
类的继承 ● 通过类的继承(派生)来引入“是一个”的关系( 17.2 — Basic inheritance in C) – 通常采用 public 继承( struct V.S. class ) – 注意:继承部分不是类的声明 – 使用基类的指针…...
03 OpenCV图像运算
文章目录1 普通加法1 加号相加2 add函数2 加权相加3 按位运算1 按位与运算2 按位或运算、非运算4 掩膜1 普通加法 1 加号相加 在 OpenCV 中,图像加法可以使用加号运算符()来实现。例如,如果要将两幅图像相加,可以使用…...
【C语言学习笔记】:动态库
一、动态库 通过之前静态库那篇文章的介绍。发现静态库更容易使用和理解,也达到了代码复用的目的,那为什么还需要动态库呢? 1、为什么还需要动态库? 为什么需要动态库,其实也是静态库的特点导致。 ▶ 空间浪费是静…...
网站开发需要什么技能/百度指数入口
1.概述进程、线程、协程 1.1什么是进程和线程 进程是什么呢? 应用程序运行的本身就是一个进程,比如 玩红警或者打开一个应用软件,我们都可以通过任务管理器查看到。 进程是操作系统结构的基础,是正在运行的程序。 总而言之&#x…...
留学生做留服证明在哪个网站/关键词seo优化排名
思路:因为改变的数是同一个,所以最后对LIS的贡献最多只能是1,所以可以先求出最长上升子序列长度,然后每个改变的数,考虑它对LIS的加成是1还是0. 设a[i]表示已第i项结束的最长上升子序列长度,b[i]表示以第i…...
怎么自己做个网站/互联网推广是做什么的
1.desc 表名;–查看表的数据结构 2.select database();–查看当前所在数据库 3.distinct;–去重 4.MySQL中的"“号含义: 1.只做运算符 2.如果有字符串则尝试转换成数字,如果能转成数字则返回数字,如果不能则返回0 3.使用”"号拼接时…...
宁波网站建设报价/网络优化器下载
$keys "\\wuwu\\wuwuiw\\shssiu"; $mWuwu new $keys(); $describe $mWuwu->describe(); var_dump(expression);#111namespace wuwu\wuwuiw; class shssiu{public function describe(){return 111;} }...
重庆软件开发公司排名/seo兼职工资一般多少
preparation: solve the problem of from builtins import rang pip install futureupdate_rule转载于:https://www.cnblogs.com/LilyEvansHogwarts/p/9126393.html...
公司内部网站页面设计/热搜榜排名今日
构造器 1.父类中如果有无参构造器,在子类中的每个构造器首行都会有一个默认的隐式的super()指向父类的无参构造器。 2.如果父类中没有无参构造器,子类中的每个子类构造器的首行要显示调用父类的有参构造器,或者this调用本类构造器。 注意 …...