【Java】CompletableFuture 并发顺序调度
前言
Java CompletableFuture 提供了一种异步编程的方式,可以在一个线程中执行长时间的任务,而不会堵塞主线程。
和Future相比,CompletableFuture不仅实现了Future接口,也实现了 CompletionStage接口。Future接口不用多说,CompletionStage接口将多个CompletionStage执行顺序依赖给抽象了出来。
有了CompletableFuture接口,就能将多个异步事件的结果进行执行顺序编排。
使用
可数操作
一般使用 CompletableFuture的场景是有一个 a 操作,一个 b操作,还有一个 c 操作依赖 a、b两个操作的返回结果。可以直接使用 allOf()接受一长串的入参,也可以使用thenCombine()针对两个操作的特定情况。
public static void main(String[] argv) {CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(second * 20);} catch (InterruptedException e) {throw new RuntimeException(e);}return "1";});CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(second * 20);} catch (InterruptedException e) {throw new RuntimeException(e);}return "2";});CompletableFuture c9 = CompletableFuture.allOf(c1, c2);c9.thenApply(v -> {try {c1.get();c2.get();System.out.println("Everything is all right");} catch(Exception e) {e.printStackTrace();} finally {System.out.println("Something error");}return v;});c9.join();}
可变操作
当想要处理的 CompletableFuture 是可变的,比如说根据数据库查出的数据每个都需要执行一个 CompletableFuture 操作,也就是 n 个 CompletableFuture。
CompletableFuture<Void> allFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0]));CompletableFuture<List<T>> result = allFuture.thenApply(v ->completableFutureList.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList()));List<T> tList = result.get(50, TimeUnit.SECONDS);
源码实现
CompletableFuture 类成员变量
CompletableFuture中有一个 volatile 关键词修饰的成员变量,result
,CompletableFuture.get()
函数中的返回的就是这个变量。它会先检查result变量是否为null,不为null则直接返回,为null则会根据是否可中断进行一个while循环等。
根据使用get()
或者 get(long timeout, TimeUnit unit)
函数的不同,最终等待result结果的函数也不同。get(long timeout, TimeUnit unit)
函数会是用 timedGet(long nanos) 函数进行等待。
/*** Waits if necessary for this future to complete, and then* returns its result.** @return the result value* @throws CancellationException if this future was cancelled* @throws ExecutionException if this future completed exceptionally* @throws InterruptedException if the current thread was interrupted* while waiting*/public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);}
除了代表结果的 result
之外,还有一个 Completion 类 的变量 stack
。从断点执行和代码的注解上看,这个stack
代表者从属当前CompletableFuture的操作。当前CompletableFuture操作执行完毕后(result里有结果),会引动其他Completion进行处理。
/* * A CompletableFuture may have dependent completion actions,* collected in a linked stack. It atomically completes by CASing* a result field, and then pops off and runs those actions. This* applies across normal vs exceptional outcomes, sync vs async* actions, binary triggers, and various forms of completions.*/*
可以通过截图看出 在Idea 内存中有和没有 Completion stack的CompletableFuture相比,有比没有多了 1 dependents的标记
Completion stack里没有东西的CompletableFuture | Completion stack里有东西的CompletableFuture |
---|---|
CompletableFuture 多个操作组织结构
CompletableFuture类能够通过 CompletableFuture.allOf()
或者 CompletableFuture.anyOf()
将多个CompletableFuture 对象组合在一起,等到满足条件时,再触发之后操作的执行。
以allOf
方法为例,CompletableFuture.allOf(CompletableFuture<?>... cfs)
方法会整合作为入参的所有CompletableFuture,等到他们呢所有的都完成之后,才返回结果。
/* ------------- Arbitrary-arity constructions -------------- *//*** Returns a new CompletableFuture that is completed when all of* the given CompletableFutures complete. If any of the given* CompletableFutures complete exceptionally, then the returned* CompletableFuture also does so, with a CompletionException* holding this exception as its cause. Otherwise, the results,* if any, of the given CompletableFutures are not reflected in* the returned CompletableFuture, but may be obtained by* inspecting them individually. If no CompletableFutures are* provided, returns a CompletableFuture completed with the value* {@code null}.** <p>Among the applications of this method is to await completion* of a set of independent CompletableFutures before continuing a* program, as in: {@code CompletableFuture.allOf(c1, c2,* c3).join();}.** @param cfs the CompletableFutures* @return a new CompletableFuture that is completed when all of the* given CompletableFutures complete* @throws NullPointerException if the array or any of its elements are* {@code null}*/public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);}/** Recursively constructs a tree of completions. */static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,int lo, int hi) {CompletableFuture<Void> d = new CompletableFuture<Void>();if (lo > hi) // emptyd.result = NIL;else {CompletableFuture<?> a, b;int mid = (lo + hi) >>> 1;if ((a = (lo == mid ? cfs[lo] :andTree(cfs, lo, mid))) == null ||(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :andTree(cfs, mid+1, hi))) == null)throw new NullPointerException();if (!d.biRelay(a, b)) {BiRelay<?,?> c = new BiRelay<>(d, a, b);a.bipush(b, c);c.tryFire(SYNC);}}return d;}/** Pushes completion to this and b unless both done. */final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {if (c != null) {Object r;while ((r = result) == null && !tryPushStack(c))lazySetNext(c, null); // clear on failureif (b != null && b != this && b.result == null) {Completion q = (r != null) ? c : new CoCompletion(c);while (b.result == null && !b.tryPushStack(q))lazySetNext(q, null); // clear on failure}}} /** Returns true if successfully pushed c onto stack. */final boolean tryPushStack(Completion c) {Completion h = stack;lazySetNext(c, h);return UNSAFE.compareAndSwapObject(this, STACK, h, c);} boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {Object r, s; Throwable x;if (a == null || (r = a.result) == null ||b == null || (s = b.result) == null)return false;if (result == null) {if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)completeThrowable(x, r);else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)completeThrowable(x, s);elsecompleteNull();}return true;}
从源码上看是,是将整个CompletableFuture数组通过andTree()
方法划分成了一颗二叉树,这个二叉树的叶子节点是传入的CompletableFuture对象,非叶子节点代表了它的子节点CompletableFuture的完成情况。
然后检测根节点的CompletableFuture的两个子节点是否完成。
cfs1、cfs2、cfs3、cfs4 是allOf
的入参,四个CompletableFuture对象。
代码中通过a.bipush(b, c)
将 a、b串在一起。因为涉及到UNSAFE方法,不知道方法具体执行了什么操作。所以只能通过IDEA里内存里实际的值,去由结果推过程。
a.bipush(b,c)
前,内存各个变量实际值。
a.bipush(b,c)
后,内存各个变量实际值。
tryPushStack(Completion c) 方法前
tryPushStack(Completion c)
方法后 可以看到内存中 变量b 对应的内存地址为 75bd9247的 stack
被赋值了成为了Completion c。
tryFire(int mode)
方法执行前
可以看到 cfs 除了 cfs1 之外,其他的 cfs 中的 stack都被赋值了。通过观察IDEA中内存中对象实际值,可以发现stack中 的 src 是 自己的树上的兄弟节点, snd 是自己。
CompletableFuture 多个操作执行顺序控制
CompletableFuture 一个节点要开始执行的前提是他的子节点全部执行完毕之后,才能触发自己节点上的操作。
当调用CompletableFuture 异步执行方法 supplyAsync
会传递一个 Supplier
对象作为入参。这个Supplier
会被封装成为 一个Runnable
子类 AsyncSupply
对象,作为其抽象方法 run
中 执行的一部分。
CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(second * 20);} catch (InterruptedException e) {throw new RuntimeException(e);}return "2";});---------------------------------------------------------------------------------/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the {@link ForkJoinPool#commonPool()} with* the value obtained by calling the given Supplier.** @param supplier a function returning the value to be used* to complete the returned CompletableFuture* @param <U> the function's return type* @return the new CompletableFuture*/public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;}-----------------------------------------------------------------------------------public void run() {// fn 就是 CompletableFuture.supplyAsync 传入的 Supplier CompletableFuture<T> d; Supplier<T> f;if ((d = dep) != null && (f = fn) != null) {dep = null; fn = null;if (d.result == null) {try {// 将 Supplier 处理结果赋值给 CompletableFuture 的 resultd.completeValue(f.get());} catch (Throwable ex) {d.completeThrowable(ex);}} // Pops and tries to trigger all reachable dependents. Call only when known to be done.d.postComplete();}}
从源码中可以看到,当执行了CompletableFuture.supplyAsync()
他的通知机制封装在实现Runnable
抽象方法run
里。当你传入的Supplier
有结果返回之后,会调用 CompletableFuture 中的 postComplete()
方法,通知 stack
中其他可达的 从属 Completion
,让他们各自完成自己的 action。
/*** Pops and tries to trigger all reachable dependents. Call only* when known to be done.*/final void postComplete() {/** On each step, variable f holds current dependents to pop* and run. It is extended along only one path at a time,* pushing others to avoid unbounded recursion.*/CompletableFuture<?> f = this; Completion h;while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;if (f.casStack(h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);continue;}h.next = null; // detach}// 将 下一个需要执行的 Completion 弹出来后 执行 tryFiref = (d = h.tryFire(NESTED)) == null ? this : d;}}}static final class UniApply<T,V> extends UniCompletion<T,V> {Function<? super T,? extends V> fn;UniApply(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T,? extends V> fn) {super(executor, dep, src); this.fn = fn;}final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d; CompletableFuture<T> a;if ((d = dep) == null ||// uniApply 对封装的 Supplier 进行执行!d.uniApply(a = src, fn, mode > 0 ? null : this))return null;dep = null; src = null; fn = null;return d.postFire(a, mode);}} final <S> boolean uniApply(CompletableFuture<S> a,Function<? super S,? extends T> f,UniApply<S,T> c) {Object r; Throwable x;if (a == null || (r = a.result) == null || f == null)return false;tryComplete: if (result == null) {if (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {completeThrowable(x, r);break tryComplete;}r = null;}try {if (c != null && !c.claim())return false;@SuppressWarnings("unchecked") S s = (S) r;// 这里实际执行 CompletableFuture 的 SuppliercompleteValue(f.apply(s));} catch (Throwable ex) {completeThrowable(ex);}}return true;}
从Idea里的 栈帧中可以看出来,是由 CompletableFuture 1 执行完后的 postComplete 引发了接下来的CompletableFuture
相关文章:
【Java】CompletableFuture 并发顺序调度
前言 Java CompletableFuture 提供了一种异步编程的方式,可以在一个线程中执行长时间的任务,而不会堵塞主线程。 和Future相比,CompletableFuture不仅实现了Future接口,也实现了 CompletionStage接口。Future接口不用多说&#…...
职场人必备的6款实用办公app,每一款都是心头爱
打工人不容易啊,不提高工作效率怕是要被淘汰了。今天给大家分享6款职场人必备的实用办公APP,免费效率神器让工作事半功倍。这些APP每一款都是我的心头爱,肯定会让人大开眼界的,超级实用,直接往下看吧。1、向日葵远程控…...
小丑改造计划之复习一
1.函数重载 根据参数个数 参数顺序 参数类型 的不同 可以在同一个域存在多个同名函数 但是不可以根据返回值 缺省参数的不同去重载函数 2.指针和引用的区别 第一点 指针是内存地址,会开辟内存空间,而引用和它所引用的变量共享同一块内存 第二点 引用必须…...
final修饰符使用中遇到的一些问题
文章目录final修饰符1. final不能用来修饰构造方法2. final修饰变量的一些注意问题2.1 final修饰成员变量2.2 final修饰引用类型2.2.1 演示代码中lombok链式编程介绍final修饰符 final具有“不可改变”的含义,它可以修饰非抽象类、非抽象成员方法和变量。 用final…...
好记又实用的获取电脑型号方法
个人常用的方法 方法二最好记又好用。 方法一 dxdiag命令 按下键盘WINR调出运行在输入框输入dxdiag命令后,按下回车;进入DirectX诊断工具,便可查看系统型号等信息。 这里就会显示系统型号。 方法二 设备和打印机 控制面板-查看方式-小图…...
@Transactional配置详解
一:事务注解Transactional,属性propagation的7个配置 PROPAGATION_REQUIRED -- 支持当前事务,如果当前没有事务,就新建一个事务。,默认配置,也是常用的选择。 PROPAGATION_SUPPORTS -- 支持当前事务&#…...
性能测试面试题汇总
稳定性测试的怎么挑选的接口? 1、频繁使用的接口:选择那些被频繁使用的接口,因为这些接口可能会面临更大的负载和并发访问,从而可能导致性能问题。 2、核心功能接口:选择那些实现系统核心功能的接口,因为这…...
vue权限控制和动态路由
思路 登录:当用户填写完账号和密码后向服务端验证是否正确,验证通过之后,服务端会返回一个token,拿到token之后(我会将这个token存贮到localStore中,保证刷新页面后能记住用户登录状态)…...
利用正则表达式删掉代码中的所有注释-pycharm为例
首先删除注释 打开您想要删除注释的Python文件。 使用快捷键 Ctrl Shift R 打开 "Replace in Files"(在文件中替换)对话框。 在 "Find"(查找)框中输入以下正则表达式,以查找所有行中的注释内容…...
【java基础】内部类、局部内部类、匿名内部类、静态内部类
内部类 内部类就是定义在另一个类中的类。我们使用内部类的原因主要有以下两点 内部类可以对同一个包中的其他类隐藏内部类方法可以访问定义这个类的作用域中的数据,包括原本私有的数据 public class A {class B {} }我们使用内部类可以访问外部类的所有属性&…...
react renderProps学习记录
react renderProps学习记录1.引入2.改一下呢3.再改一下呢4.总结一下如何向组件内部动态传入带内容的结构(标签)?children propsrender props1.引入 上代码: import React, { Component } from react import ./index.css export default class Parent extends Com…...
关于tf.gather函数batch_dims参数用法的理解
关于tf.gather函数batch_dims参数用法的理解0 前言1. 不考虑batch_dims2. 批处理(考虑batch_dims)2.1 batch_dims12.2 batch_dims02.3 batch_dims>22.4 batch_dims再降为12.5 再将axis降为12.6 batch_dims<02.7 batch_dims总结3. 补充4. 参数和返回值5. 其他相关论述6. 附…...
日常操作linux常用命令
cd /mnt/opt/cqstt/logs/stt-erp docker logs -f --tail1000 stt-erp # 查看物理CPU个数 cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l # 查看每个物理CPU中core的个数(即核数) cat /proc/cpuinfo| grep "cpu cores"| uniq # 查看逻辑CPU的…...
【Java集合框架】篇二:Collection接口方法
JDK不提供此接口的任何直接实现类,而是提供更具体的子接口(如:Set和List)去实现。 Collection 接口是 List和Set接口的父接口,该接口里定义的方法既可用于操作 Set 集合,也可用于操作 List 集合。方法如下…...
PHP入门指南:简单易学的语法和丰富的调试工具与安全性最佳实践
PHP是一种非常流行的服务器端编程语言,它被广泛地应用于Web开发中。如果您想学习Web开发,那么PHP是一个非常好的选择。在本文中,我将介绍PHP的一些基础知识,包括语法、变量、函数、数组、数据库连接、调试和安全性等。PHP的语法PH…...
前端面试题--HTML篇
一、src和href的区别src指向外部资源的位置,指向的内容会嵌入到文档中当前标签所在的位置;href指向网络资源的位置,建立和当前元素或当前文档之间的链接。二、对HTML语义化的理解根据内容的结构化,选择合适的标签。优点࿱…...
SpringBoot集成ElasticSearch,实现模糊查询,批量CRUD,排序,分页,高亮
导入elasticsearch依赖在pom.xml里加入如下依赖:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>非常重要:检查依赖版本…...
常用Swagger注解汇总
常用Swagger注解汇总 前言 在实际编写后端代码的过程中,我们可能经常使用到 swagger 注解,但是会用不代表了解,你知道每个注解都有什么属性吗?你都用过这些属性吗?了解它们的作用吗?本文在此带大家总结一下…...
关于 TypeScript 声明文件
declare var 声明全局变量declare function 声明全局方法declare class 声明全局类declare enum 声明全局枚举类型declare namespace 声明(含有子属性的)全局对象interface 和 type 声明全局类型export 导出变量export namespace 导出(含有子…...
SpringBoot学习-原理篇
SpringBoot原理篇springboot技术本身就是为了加速spring程序的开发的,可以大胆的说,springboot技术没有自己的原理层面的设计,仅仅是实现方案进行了改进。将springboot定位成工具,你就不会去想方设法的学习其原理了。就像是将木头…...
目标检测YOLOv5数据集怎么找?
完整的配置-标注-训练-识别在我这篇博客小白YOLOv5全流程-训练实现数字识别_yolov5数字识别_牛大了2022的博客-CSDN博客 模型部分剖析可以看我每周深度学习笔记部分。关于训练的数据集怎么搞很多人问过我,我在这篇文章给大家一点我的经验和建议。 数据集是什么 简…...
安卓短信自动填充踩坑
安卓短信自动填充踩坑 前言 最近弄了个短信自动填充功能,一开始觉得很简单,不就是动态注册个广播接收器去监听短信消息不就可以了吗?结果没这么简单,问题就出在机型的适配上。小米的短信权限、荣耀的短信监听都是坑,…...
【抽象类和接口的区别】
抽象类和接口都是Java中实现多态的机制,它们都是用来约束子类必须要实现的方法。但是它们有以下区别: 实现方式 实现方式:抽象类是一个类,而接口是一个接口。一个类只能继承一个抽象类,但可以实现多个接口。 构造方…...
接口导出文件功能
1.写接口 export function getExport(params) { return fetch({ url: ******.export, method: post, data: params, responseType:blob, }) } 2.编写前端页面 <el-button :loading"exportDisable" :disabled&quo…...
深圳大学计软《面向对象的程序设计》实验9 期中复习
A. 机器人变身(类与对象)【期中模拟】 题目描述 编写一个机器人类,包含属性有机器名、血量、伤害值、防御值、类型和等级。其中血量、伤害和防御和等级、类型相关: 普通型机器人,类型为N,血量、伤害、防…...
python之异步编程
一、异步编程概述 异步编程是一种并发编程的模式,其关注点是通过调度不同任务之间的执行和等待时间,通过减少处理器的闲置时间来达到减少整个程序的执行时间;异步编程跟同步编程模型最大的不同就是其任务的切换,当遇到一个需要等…...
为什么很多计算机专业大学生毕业后还会参加培训?
基于IT互联网行业越来越卷的现状,就算是科班出身,很多也是达不到用人单位的要求。面对这样的现实情况,有的同学会选择继续深造,比如考个研,去年考研人数457万人次,可见越来越的同学是倾向考研提升学历来达到…...
JUC并发编程之JMM_synchronized_volatile
目录 JUC并发编程之JMM_synchronized_volatile 什么是JMM模型? JMM和JVM的区别 JMM不同于JVM内存区域模型 主内存 工作内存 Java内存模型与硬件内存架构的关系 JMM存在的必要性 数据同步八大原子操作 同步规则分析 并发编程的可见性,原子性与有序…...
hashCode 和 equals 的处理
文章目录hashCode 和 equals 的处理1. 阿里巴巴编程规范要求2. equals和hashcode的分析2.1 Object方法2.2 只覆写(Override)equals带来的问题问题演示问题分析问题处理hashCode 和 equals 的处理 1. 阿里巴巴编程规范要求 2. equals和hashcode的分析 2…...
17. OPenGL实现旋转移动物体
1. 说明: 整体思路:如果想实现动态,可以使用一个矩阵和我们给定的坐标值进行相乘,实时的改变坐标值 类似于坐标的齐次变换,然后使用一个定时器,在规定时间内触发重新绘制的函数。 实际效果: OP…...
新闻网站建设的任务要求/网络营销推广方案3篇
左移运算符就是在二进制的基础上对数字进行平移。按照平移的方向和填充数字的规则分为三种:<<(左移)、>>(带符号右移)和>>>(无符号右移)。 在 移位运算时,byte、 short和char类型移位后的结果会变成int类型,对于byte、s…...
西安网站设计报价/网站自然排名工具
目录 一、实验原理 二、实验拓扑 三、实验步骤 四、实验过程 总结 实验难度3实验复杂度3一、实验原理 我们在配置路由器ACL的时候都是一个需求一个ACL这样来配置,这种做法是比较严谨的,但是如果需求变得很多了呢?例如,下图…...
织梦可以做论坛网站/浙江专业网站seo
less-10 GET - Blind - Time based - double quotes (基于时间的双引号盲注) 由于不管怎么输入都会被过滤,返回同一个结果, 所以只能用时间延迟注入 开始注入 ?id1and sleep(3) --此时注入成功 开始爆库 ?id1’ and if(length(database())4 , sle…...
陕西做网站公司有哪些/怎样搭建一个网站
松下焊接机器人-基本操作篇Panasonic 业机器人机器人专用教材G 系列基本操作 篇>>机器人本体:VR2/TA/TB 控制装置:WG/G2/GXPYA-1NC/YA-1PC/YA-1QC/YA-1RC/YA-1SC 系列唐山松下产业机器 (有)机器人学校200906皆以機器人之操作難甚&…...
wordpress终极优化/网店代运营
设计数据库之前,需要绘制ER图,通过ER图创建出对应的表结构,后期可能需要修改表结构,那么还需要修改ER图,甚至重写,很麻烦,浪费大量时间。 powerdesigner的好处是:在创建好ER图之后&a…...
php用什么做网站服务器吗/星乐seo网站关键词排名优化
276. 栅栏涂色 有 k 种颜色的涂料和一个包含 n 个栅栏柱的栅栏,请你按下述规则为栅栏设计涂色方案: 每个栅栏柱可以用其中 一种 颜色进行上色。 相邻的栅栏柱 最多连续两个 颜色相同。 给你两个整数 k 和 n ,返回所有有效的涂色 方案数 。…...