当前位置: 首页 > news >正文

JAVA中使用CompletableFuture进行异步编程

JAVA中使用CompletableFuture进行异步编程

1、什么是CompletableFuture

CompletableFuture 是 JDK8 提供的 Future 增强类,CompletableFuture 异步任务执行线程池,默认是把异步任

务都放在 ForkJoinPool 中执行。

在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成,主线程可以并行的执行其他任务。

2、Future存在的问题

Future 实际采用 FutureTask 实现,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务

的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接

口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。

通常的线程池接口类 ExecutorService,其中execute方法的返回值是void,即无法获取异步任务的执行状态,3个

重载的 submit 方法的返回值是 Future,可以据此获取任务执行的状态和结果。

package com;import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务ExecutorService executorService = Executors.newSingleThreadExecutor();Future<Double> cf = executorService.submit(() -> {System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}if (false) {throw new RuntimeException("test");} else {System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());return 1.2;}});executorService.shutdown();System.out.println("main thread start,time->" + System.currentTimeMillis());// 等待子任务执行完成,如果已完成则直接返回结果// 如果执行任务异常,则get方法会把之前捕获的异常重新抛出System.out.println("run result->" + cf.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 程序输出
main thread start,time->1693019539222
Thread[pool-1-thread-1,5,main] start,time->1693019539222
Thread[pool-1-thread-1,5,main] exit,time->1693019541223
run result->1.2
main thread exit,time->1693019541226

子线程是异步执行的,主线程休眠等待子线程执行完成,子线程执行完成后唤醒主线程,主线程获取任务执行结果

后退出。

很多博客说使用不带等待时间限制的get方法时,如果子线程执行异常了会导致主线程长期阻塞,这其实是错误

的,子线程执行异常时其异常会被捕获,然后修改任务的状态为异常结束并唤醒等待的主线程,get方法判断任务

状态发生变更,就终止等待了,并抛出异常。将上述用例中if(false)改成if(true) ,执行结果如下:

main thread start,time->1693019872552
Thread[pool-1-thread-1,5,main] start,time->1693019872552
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: testat java.util.concurrent.FutureTask.report(FutureTask.java:122)at java.util.concurrent.FutureTask.get(FutureTask.java:192)at com.FutureTest.test(FutureTest.java:34)at com.FutureTest.main(FutureTest.java:11)
Caused by: java.lang.RuntimeException: testat com.FutureTest.lambda$test$0(FutureTest.java:25)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)

get方法抛出异常导致主线程异常终止。

Future 的局限性:它没法直接对多个任务进行链式、组合等处理,需要借助并发工具类才能完成,实现逻辑比较

复杂。

而 CompletableFuture 是对 Future 的扩展和增强,CompletableFuture 实现了 Future 接口,并在此基础上进

行了丰富的扩展,完美弥补了 Future 的局限性,同时 CompletableFuture 实现了对任务编排的能力。借助这项

能力,可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以

往,虽然通过 CountDownLatch 等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以

维护。

CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、

流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

CompletionStage 接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的,默认线程

池是 ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池。

CompletableFuture 中默认线程池如下:

// 根据commonPool的并行度来选择,而并行度的计算是在ForkJoinPool的静态代码段完成的
private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ForkJoinPool 中初始化 commonPool 的参数:

static {// initialize field offsets for CAS etctry {U = sun.misc.Unsafe.getUnsafe();Class<?> k = ForkJoinPool.class;CTL = U.objectFieldOffset(k.getDeclaredField("ctl"));RUNSTATE = U.objectFieldOffset(k.getDeclaredField("runState"));STEALCOUNTER = U.objectFieldOffset(k.getDeclaredField("stealCounter"));Class<?> tk = Thread.class;……} catch (Exception e) {throw new Error(e);}commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;defaultForkJoinWorkerThreadFactory =new DefaultForkJoinWorkerThreadFactory();modifyThreadPermission = new RuntimePermission("modifyThread");// 调用makeCommonPool方法创建commonPool,其中并行度为逻辑核数-1common = java.security.AccessController.doPrivileged(new java.security.PrivilegedAction<ForkJoinPool>() {public ForkJoinPool run() { return makeCommonPool(); }});int par = common.config & SMASK; // report 1 even if threads disabledcommonParallelism = par > 0 ? par : 1;
}

3、CompletableFuture功能

3.1 依赖关系

thenApply():把前面任务的执行结果,交给后面的Function。

thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回。

3.2 and集合关系

thenCombine():合并任务,有返回值。

thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值。

runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务)。

3.3 or聚合关系

applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值。

acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值。

runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务)。

3.4 并行执行

allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture。

anyOf():当任何一个给定的 CompletablFuture 完成时,返回一个新的 CompletableFuture。

3.5 结果处理

whenComplete:当任务完成时,将使用结果(或null)和此阶段的异常(或 null如果没有)执行给定操作。

exceptionally:返回一个新的 CompletableFuture,当前面的 CompletableFuture 完成时,它也完成,当它

异常完成时,给定函数的异常触发这个 CompletableFuture 的完成。

3、CompletableFuture(runAsync和supplyAsync)创建异步任务

CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

这四个方法的区别:

  • runAsync()Runnable函数式接口类型为参数,没有返回结果,supplyAsync()Supplier函数式接

    口类型为参数,返回结果类型为USupplier接口的get()是有返回值的(会阻塞)。

  • 使用没有指定Executor的方法时,内部使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。

    如果指定线程池,则使用指定的线程池运行。

  • 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU

    的核数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置

    ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一

    些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统

    的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

supplyAsync 表示创建带返回值的异步任务的,相当于 ExecutorService submit(Callable task) 方法。

runAsync 表示创建无返回值的异步任务,相当于 ExecutorService submit(Runnable task)方法。

这两方法的效果跟 submit 是一样的。

这两方法各有一个重载版本,可以指定执行异步任务的Executor实现,如果不指定,默认使用

ForkJoinPool.commonPool(),如果机器是单核的,则默认使用ThreadPerTaskExecutor,该类是一个内部类,每

次执行execute都会创建一个新线程。

3.1 runAsync

runAsync 没有返回值

package com;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;public class RunAsyncTest {public static void main(String[] args) {List<Integer> numberList = new ArrayList<>();for (int i = 1; i < 11; i++) {numberList.add(i);}System.out.println("start!");long start = System.currentTimeMillis();for (Integer number : numberList) {CompletableFuture.runAsync(() -> {try {Thread.sleep(1000);System.out.println(number);} catch (InterruptedException e) {e.printStackTrace();}});}long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start));System.out.println("end!");}
}
# 程序输出
start!
耗时:44
end!
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test1 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务,无返回值CompletableFuture cf = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException("test");} else {System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());}});System.out.println("main thread start,time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("run result->" + cf.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 程序输出
main thread start,time->1693021661122
Thread[ForkJoinPool.commonPool-worker-1,5,main] start,time->1693021661122
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit,time->1693021663123
run result->null
main thread exit,time->1693021663123
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 自定义线程池*/
public class Test3 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newSingleThreadExecutor();// 创建异步执行任务:CompletableFuture cf = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException("test");} else {System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());}}, executorService);System.out.println("main thread start,time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("run result->" + cf.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 程序输出
main thread start,time->1693022272784
Thread[pool-1-thread-1,5,main] start,time->1693022272784
Thread[pool-1-thread-1,5,main] exit,time->1693022274784
run result->null
main thread exit,time->1693022274784

3.2 supplyAsync

supplyAsync 有返回值

package com;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class SupplyAsynctest {public static void main(String[] args) {List<Integer> numberList = new ArrayList<>();for (int i = 1; i < 11; i++) {numberList.add(i);}List<CompletableFuture<Integer>> futureList = new ArrayList<>();System.out.println("start!");long start = System.currentTimeMillis();for (Integer number : numberList) {futureList.add(CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);return number;} catch (InterruptedException e) {e.printStackTrace();return 0;}}));}for (CompletableFuture<Integer> completableFuture : futureList) {Integer number = null;try {number = completableFuture.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(number);}long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start));System.out.println("end!");}
}
# 程序输出
start!
1
2
3
4
5
6
7
8
9
10
耗时:2047
end!
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test2 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务,有返回值CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException("test");} else {System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());return 1.2;}});System.out.println("main thread start,time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("run result->" + cf.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}}
# 程序输出
main thread start,time->1693021915960
Thread[ForkJoinPool.commonPool-worker-1,5,main] start,time->1693021915960
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit,time->1693021917960
run result->1.2
main thread exit,time->1693021917962
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test4 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool = new ForkJoinPool();// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException("test");} else {System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());return 1.2;}}, pool);System.out.println("main thread start,time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("run result->" + cf.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}}
# 程序输出
main thread start,time->1693022336576
Thread[ForkJoinPool-1-worker-1,5,main] start,time->1693022336576
Thread[ForkJoinPool-1-worker-1,5,main] exit,time->1693022338577
run result->1.2
main thread exit,time->1693022338578

4、获取结果(get和join)

join()get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck

常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException

InterruptedException 需要用户手动处理(抛出或者 try catch)。

5、结果处理( whenComplete和exceptionally和handle)

5.1 whenComplete和exceptionally方法

CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方

法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
  • whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。

  • Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异

    常情况。

  • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用

    相同的线程池,也可能会被同一个线程选中执行)。

  • 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture

    的计算结果或者返回异常。

  • whenCompletewhenCompleteAsync 的区别:whenComplete是执行当前任务的线程继续执行

    whenComplete的任务。whenCompleteAsync是执行把whenCompleteAsync 这个任务继续提交给线程池来

    进行执行。

whenComplete 是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方

法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常

执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。

exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该

任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果。

package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;public class WhenCompleteAndExceptionally {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}Integer randomNum = new Random().nextInt(10);if (randomNum % 2 == 0) {System.out.println("任务发生异常,返回给exceptionally!");Integer num = 12 / 0;}System.out.println("任务处理完成,返回给whenComplete!");return randomNum;});// 任务完成或异常方法完成时执行(whenComplete),无论有异常与否,这个方法都会执行// 如果出现了异常,任务结果为nullfuture.whenComplete(new BiConsumer<Integer, Throwable>() {@Overridepublic void accept(Integer t, Throwable action) {System.out.println("任务正常,接收supplyAsync的返回值," + "结果是:" + t);}});// 出现异常时先执行(exceptionally),然后再执行(whenComplete)// 如果没有出现异常,(exceptionally)不会被执行future.exceptionally(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable t) {System.out.println("任务异常,接收supplyAsync的返回值,异常是:" + t.getMessage());return -1;}});// 如果发生了异常,此处无法获取返回值try {Integer result = future.get();System.out.println("get()获取到的结果是:" + result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
# 程序输出
# 无异常 supplyAsync->whenComplete->get()
任务处理完成,返回给whenComplete!
任务正常,接收supplyAsync的返回值,结果是:9
get()获取到的结果是:9
# 程序输出
# 出现异常 supplyAsync->exceptionally->whenComplete
任务发生异常,返回给exceptionally!
任务异常,接收supplyAsync的返回值,异常是:java.lang.ArithmeticException: / by zero
任务正常,接收supplyAsync的返回值,结果是:null
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)at com.WhenCompleteAndExceptionally.main(WhenCompleteAndExceptionally.java:48)
Caused by: java.lang.ArithmeticException: / by zeroat com.WhenCompleteAndExceptionally.lambda$main$0(WhenCompleteAndExceptionally.java:22)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

另一种写法:

package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;public class WhenCompleteAndExceptionally2 {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}Integer randomNum = new Random().nextInt(10);if (randomNum % 2 == 0) {System.out.println("任务发生异常,返回给exceptionally!");Integer num = 12 / 0;}System.out.println("任务处理完成,返回给whenComplete!");return randomNum;})// 任务完成或异常方法完成时执行(whenComplete),无论有异常与否,这个方法都会执行// 如果出现了异常,任务结果为null.whenComplete(new BiConsumer<Integer, Throwable>() {@Overridepublic void accept(Integer t, Throwable action) {System.out.println("任务正常,接收supplyAsync的返回值," + "结果是:" + t);}})// 出现异常时先执行(exceptionally),然后再执行(whenComplete)// 如果没有出现异常,(exceptionally)不会被执行.exceptionally(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable t) {System.out.println("任务异常,接收supplyAsync的返回值,异常是:" + t.getMessage());return -1;}});// 如果发生了异常,此处无法获取返回值try {Integer result = future.get();System.out.println("get()获取到的结果是:" + result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
# 程序输出
# 无异常 supplyAsync->whenComplete->get()
任务处理完成,返回给whenComplete!
任务正常,接收supplyAsync的返回值,结果是:7
get()获取到的结果是:7
# 程序输出
# 有异常 supplyAsync->whenComplete->exceptionally->get()
任务发生异常,返回给exceptionally!
任务正常,接收supplyAsync的返回值,结果是:null
任务异常,接收supplyAsync的返回值,异常是:java.lang.ArithmeticException: / by zero
get()获取到的结果是:-1
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;public class WhenCompleteAndExceptionally3 {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}Integer randomNum = new Random().nextInt(10);if (randomNum % 2 == 0) {System.out.println("任务发生异常,返回给exceptionally!");Integer num = 12 / 0;}System.out.println("任务处理完成,返回给whenComplete!");return randomNum;});// 任务完成或异常方法完成时执行(whenComplete),无论有异常与否,这个方法都会执行// 如果出现了异常,任务结果为nullCompletableFuture<Integer> future1 = future.whenComplete(new BiConsumer<Integer, Throwable>() {@Overridepublic void accept(Integer t, Throwable action) {System.out.println("任务正常,接收supplyAsync的返回值," + "结果是:" + t);}});// 出现异常时先执行(exceptionally),然后再执行(whenComplete)// 如果没有出现异常,(exceptionally)不会被执行CompletableFuture<Integer> future2 = future.exceptionally(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable t) {System.out.println("任务异常,接收supplyAsync的返回值,异常是:" + t.getMessage());return -1;}});// 如果发生了异常,此处无法获取返回值try {Integer result = future2.get();System.out.println("get()获取到的结果是:" + result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
# 程序输出
# 无异常 supplyAsync->whenComplete->get()
任务处理完成,返回给whenComplete!
任务正常,接收supplyAsync的返回值,结果是:3
get()获取到的结果是:3
# 程序输出
# 有异常 supplyAsync->whenComplete->exceptionally->get()
任务发生异常,返回给exceptionally!
任务异常,接收supplyAsync的返回值,异常是:java.lang.ArithmeticException: / by zero
任务正常,接收supplyAsync的返回值,结果是:null
get()获取到的结果是:-1
package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;public class WhenCompleteAndExceptionally4 {public static void main(String[] args) {CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {@Overridepublic Object get() {System.out.println(Thread.currentThread().getName() + " completableFuture");int i = 10 / 0;return 1024;}});CompletableFuture future1 = future.whenComplete(new BiConsumer<Object, Throwable>() {@Overridepublic void accept(Object o, Throwable throwable) {System.out.println("-------O=" + o);System.out.println("-------throwable=" + throwable);}});CompletableFuture future2 = future.exceptionally(new Function<Throwable, Object>() {@Overridepublic Object apply(Throwable throwable) {System.out.println("throwable=" + throwable);return 6666;}});try {System.out.println("结果是:" + future2.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
# 程序输出
# 有异常 supplyAsync->whenComplete->exceptionally->get()
ForkJoinPool.commonPool-worker-1 completableFuture
-------O=null
-------throwable=java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
throwable=java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
结果是:6666
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test5 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "job1 start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException("test");} else {System.out.println(Thread.currentThread() + "job1 exit,time->" + System.currentTimeMillis());return 1.2;}});//cf执行完成后会将执行结果和执行过程中抛出的异常传入回调方法,如果是正常执行的则传入的异常为nullCompletableFuture<Double> cf2 = cf.whenComplete((a, b) -> {System.out.println(Thread.currentThread() + "job2 start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (b != null) {System.out.println("error stack trace->");b.printStackTrace();} else {System.out.println("run succ,result->" + a);}System.out.println(Thread.currentThread() + "job2 exit,time->" + System.currentTimeMillis());});//等待子任务执行完成System.out.println("main thread start wait,time->" + System.currentTimeMillis());//如果cf是正常执行的,cf2.get的结果就是cf执行的结果//如果cf是执行异常,则cf2.get会抛出异常System.out.println("run result->" + cf2.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time->1693027350240
main thread start wait,time->1693027350240
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 exit,time->1693027352241
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time->1693027352241
run succ,result->1.2
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time->1693027354241
run result->1.2
main thread exit,time->1693027354241

将上述示例中的if(false) 改成if(true),其输出如下:

# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time->1693027394831
main thread start wait,time->1693027394832
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time->1693027396832
error stack trace->
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time->1693027398834
java.util.concurrent.CompletionException: java.lang.RuntimeException: testat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: testat com.test.Test5.lambda$test$0(Test5.java:21)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: testat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)at com.test.Test5.test(Test5.java:46)at com.test.Test5.main(Test5.java:9)
Caused by: java.lang.RuntimeException: testat com.test.Test5.lambda$test$0(Test5.java:21)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test6 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool = new ForkJoinPool();// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "job1 start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException("test");} else {System.out.println(Thread.currentThread() + "job1 exit,time->" + System.currentTimeMillis());return 1.2;}}, pool);//cf执行异常时,将抛出的异常作为入参传递给回调方法CompletableFuture<Double> cf2 = cf.exceptionally((param) -> {System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println("error stack trace->");param.printStackTrace();System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());return -1.1;});System.out.println("main thread start,time->" + System.currentTimeMillis());//等待子任务执行完成,此处无论是job2和job3都可以实现job2退出,主线程才退出,如果是cf,则主线程不会等待job2执行完成自动退出了//cf2.get时,没有异常,但是依然有返回值,就是cf的返回值System.out.println("run result->" + cf2.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time->1693038412532
main thread start,time->1693038412532
Thread[ForkJoinPool-1-worker-1,5,main]job1 exit,time->1693038414533
run result->1.2
main thread exit,time->1693038414534

将上述示例中的if(true) 改成if(false),其输出如下:

# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time->1693038448098
main thread start,time->1693038448098
Thread[ForkJoinPool-1-worker-1,5,main] start,time->1693038450099
error stack trace->
Thread[ForkJoinPool-1-worker-1,5,main] exit,time->1693038452104
run result->-1.1
main thread exit,time->1693038452107
java.util.concurrent.CompletionException: java.lang.RuntimeException: testat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: testat com.test.Test6.lambda$test$0(Test6.java:23)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more

5.2 handle 方法

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

handle是执行任务完成时对结果的处理,handle是在任务完成后再执行,还可以处理异常的任务。

handleAsync方法即可以获取执行结果,也可以感知异常信息,并能处理执行结果并返回。

跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的

result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了。

package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class handleAsyncTest {public static void main(String[] args) {System.out.println("main start ...");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("开启异步任务...");int i = 10 % 2;if (i == 0) {throw new RuntimeException("远程服务调用失败");}return i;}).handleAsync((res, thr) -> {System.out.println("进入handleAsync方法");if (res != null) {return res * 2;}if (thr != null) {System.out.println("捕获到异常:" + thr);return 0;}return 10;});try {Integer result = future.get();System.out.println("获取异步任务返回值:" + result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println("main end ...");}
}
# 程序输出
# 有异常
main start ...
开启异步任务...
进入handleAsync方法
捕获到异常:java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
获取异步任务返回值:0
main end ...
# 程序输出
# 无异常
main start ...
开启异步任务...
进入handleAsync方法
获取异步任务返回值:2
main end ...
package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class handleAsyncTest2 {public static void main(String[] args) {CountDownLatch countDownLatch = new CountDownLatch(2);System.out.println("start!");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " 进行一连串操作1....");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return 1;});// whenComplete方法,返回的future的结果还是1CompletableFuture<Integer> future = future1.whenComplete((x, y) -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " whenComplete,future返回:" + x);});// handler返回的future结果是字符串"2"CompletableFuture<String> handle = future.handle((x, y) -> {System.out.println(Thread.currentThread().getName() + " handle接收的结果:" + x);countDownLatch.countDown();return "2";});CompletableFuture<Integer> handle1 = handle.handle((x, y) -> {System.out.println(Thread.currentThread().getName() + " handle返回的结果:" + x);countDownLatch.countDown();return 2;});try {countDownLatch.await();System.out.println("接收到的返回值为:"+handle1.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}System.out.println("end!");}
}
package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class handleAsyncTest2 {public static void main(String[] args) {CountDownLatch countDownLatch = new CountDownLatch(2);System.out.println("start!");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " 进行一连串操作1....");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return 1;});// whenComplete方法,返回的future的结果还是1CompletableFuture<Integer> future = future1.whenComplete((x, y) -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " whenComplete,future返回:" + x);});// handler返回的future结果是字符串"2"CompletableFuture<String> handle = future.handle((x, y) -> {System.out.println(Thread.currentThread().getName() + " handle接收的结果:" + x);countDownLatch.countDown();return "2";});CompletableFuture<Integer> handle1 = handle.handle((x, y) -> {System.out.println(Thread.currentThread().getName() + " handle返回的结果:" + x);countDownLatch.countDown();return 2;});try {countDownLatch.await();System.out.println("接收到的返回值为:"+handle1.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}System.out.println("end!");}
}
# 程序输出
start!
ForkJoinPool.commonPool-worker-1 进行一连串操作1....
ForkJoinPool.commonPool-worker-1 whenComplete,future返回:1
ForkJoinPool.commonPool-worker-1 handle接收的结果:1
ForkJoinPool.commonPool-worker-1 handle返回的结果:2
接收到的返回值为:2
end!
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test7 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "job1 start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (false) {throw new RuntimeException("test");} else {System.out.println(Thread.currentThread() + "job1 exit,time->" + System.currentTimeMillis());return 1.2;}});//cf执行完成后会将执行结果和执行过程中抛出的异常传入回调方法,如果是正常执行的则传入的异常为nullCompletableFuture<String> cf2 = cf.handle((a, b) -> {System.out.println(Thread.currentThread() + "job2 start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (b != null) {System.out.println("error stack trace->");b.printStackTrace();} else {System.out.println("run succ,result->" + a);}System.out.println(Thread.currentThread() + "job2 exit,time->" + System.currentTimeMillis());if (b != null) {return "run error";} else {return "run succ";}});//等待子任务执行完成System.out.println("main thread start wait,time->" + System.currentTimeMillis());//get的结果是cf2的返回值,跟cf没关系了System.out.println("run result->" + cf2.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}}
# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time->1693040276755
main thread start wait,time->1693040276755
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 exit,time->1693040278755
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time->1693040278755
run succ,result->1.2
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time->1693040280757
run result->run succ
main thread exit,time->1693040280758

将上述示例中的if(true) 改成if(false),其输出如下:

# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time->1693040314676
main thread start wait,time->1693040314677
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time->1693040316676
error stack trace->
Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time->1693040318680
run result->run error
main thread exit,time->1693040318681
java.util.concurrent.CompletionException: java.lang.RuntimeException: testat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: testat com.test.Test7.lambda$test$0(Test7.java:21)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more

6、结果转换(thenApply和thenCompose)

将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。

6.1 thenApply

thenApply接收一个函数作为参数,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处

理结果的Future对象。

常用使用:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)

T:上一个任务返回结果的类型。

U:当前任务的返回值类型。

thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传

递到回调方法中。

package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class ThenApplyTest {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int result = 100;System.out.println("第一次运算:" + result);return result;}).thenApply(number -> {int result = number * 3;System.out.println("第二次运算:" + result);return result;});try {Integer result = future.get();System.out.println("结果是:" + result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
# 运行程序
第一次运算:100
第二次运算:300
结果是:300
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test8 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool = new ForkJoinPool();// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());return 1.2;}, pool);//cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中//thenApply这里实际创建了一个新的CompletableFuture实例CompletableFuture<String> cf2 = cf.thenApply((result) -> {System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());return "test:" + result;});System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("run result->" + cf.get());System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());System.out.println("run result->" + cf2.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 运行程序
Thread[ForkJoinPool-1-worker-1,5,main] start job1,time->1693042190558
main thread start cf.get(),time->1693042190558
Thread[ForkJoinPool-1-worker-1,5,main] exit job1,time->1693042192558
Thread[ForkJoinPool-1-worker-1,5,main] start job2,time->1693042192558
run result->1.2
main thread start cf2.get(),time->1693042192559
Thread[ForkJoinPool-1-worker-1,5,main] exit job2,time->1693042194558
run result->test:1.2
main thread exit,time->1693042194558

job1执行结束后,将job1的方法返回值作为入参传递到job2中并立即执行job2。thenApplyAsync与thenApply的

区别在于,前者是将job2提交到线程池中异步执行,实际执行job2的线程可能是另外一个线程,后者是由执行

job1的线程立即执行job2,即两个job都是同一个线程执行的。

package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test9 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool = new ForkJoinPool();// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());return 1.2;}, pool);//cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中//thenApply这里实际创建了一个新的CompletableFuture实例CompletableFuture<String> cf2 = cf.thenApplyAsync((result) -> {System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());return "test:" + result;});System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("run result->" + cf.get());System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());System.out.println("run result->" + cf2.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main] start job1,time->1693042362021
main thread start cf.get(),time->1693042362022
Thread[ForkJoinPool-1-worker-1,5,main] exit job1,time->1693042364022
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job2,time->1693042364023
run result->1.2
main thread start cf2.get(),time->1693042364024
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job2,time->1693042366024
run result->test:1.2
main thread exit,time->1693042366024

从输出可知,执行job1和job2是两个不同的线程。

Executor实现,如果不指定,默认使用ForkJoinPool.commonPool()。 下述的多个方法,每个方法都有两个以

Async结尾的方法,一个使用默认的Executor实现,一个使用指定的Executor实现,不带Async的方法是由触发该

任务的线程执行该任务,带Async的方法是由触发该任务的线程将任务提交到线程池,执行任务的线程跟触发任务

的线程不一定是同一个。

6.2 thenCompose

thenCompose的参数为一个返回CompletableFuture实例的函数,该函数的参数是先前计算步骤的结果。

常用方法:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;

thenCompose 方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会

返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该

result的新的CompletableFuture实例,然后执行这个新任务。

package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;public class ThenComposeTest {public static void main(String[] args) {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(30);System.out.println("第一次运算:" + number);return number;}}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {@Overridepublic CompletionStage<Integer> apply(Integer param) {return CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = param * 2;System.out.println("第二次运算:" + number);return number;}});}});try {Integer result = future.get();System.out.println("结果是:" + result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
# 程序输出
第一次运算:24
第二次运算:48
结果是:48

thenApplythenCompose的区别:

thenApply转换的是泛型中的类型,返回的是同一个CompletableFuture

thenCompose将内部的CompletableFuture调用展开来并使用上一个CompletableFutre调用的结果在下一步的

CompletableFuture调用中进行运算,是生成一个新的CompletableFuture

package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test10 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());return 1.2;});CompletableFuture<String> cf2 = cf.thenCompose((param) -> {System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());return CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());return "job3 test";});});System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("cf run result->" + cf.get());System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());System.out.println("cf2 run result->" + cf2.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time->1693043028531
main thread start cf.get(),time->1693043028531
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time->1693043030531
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job2,time->1693043030531
cf run result->1.2
main thread start cf2.get(),time->1693043030534
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job2,time->1693043032532
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job3,time->1693043032533
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job3,time->1693043034534
cf2 run result->job3 test
main thread exit,time->1693043034534

job1执行完成后job2开始执行,等job2执行完成后会把job3返回,然后执行job3,等job3执行完成后,主线程退

出。

7、结果消费(thenAccept和thenAcceptBoth和thenRun)

与结果处理和结果转换系列函数返回一个新的CompletableFuture不同,结果消费系列函数只对结果执行

Action,而不返回新的计算值。

根据对结果的处理方式,结果消费函数又可以分为下面三大类:

thenAccept():对单个结果进行消费。

thenAcceptBoth():对两个结果进行消费。

thenRun():不关心结果,只对结果执行Action

7.1 thenAccept

观察该系列函数的参数类型可知,它们是函数式接口Consumer,这个接口只有输入,没有返回值。

thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是无返回值。

常用方法:

public CompletionStage<Void> thenAccept(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;public class ThenAcceptTest {public static void main(String[] args) {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一次运算:" + number);return number;}).thenAccept(number -> System.out.println("第二次运算:" + number * 5));}
}
# 程序输出
第一次运算:3
第二次运算:15
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test11 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool = new ForkJoinPool();// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());return 1.2;}, pool);// cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中CompletableFuture cf2 = cf.thenApply((result) -> {System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());return "test:" + result;}).thenAccept((result) -> {//接收上一个任务的执行结果作为入参,但是没有返回值System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(result);System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());}).thenRun(() -> {//无入参,也没有返回值System.out.println(Thread.currentThread() + " start job4,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println("thenRun do something");System.out.println(Thread.currentThread() + " exit job4,time->" + System.currentTimeMillis());});System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("run result->" + cf.get());System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());//cf2 等待最后一个thenRun执行完成System.out.println("run result->" + cf2.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}}
# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main] start job1,time->1693043895706
main thread start cf.get(),time->1693043895707
Thread[ForkJoinPool-1-worker-1,5,main] exit job1,time->1693043897706
Thread[ForkJoinPool-1-worker-1,5,main] start job2,time->1693043897706
run result->1.2
main thread start cf2.get(),time->1693043897707
Thread[ForkJoinPool-1-worker-1,5,main] exit job2,time->1693043899706
Thread[ForkJoinPool-1-worker-1,5,main] start job3,time->1693043899706
test:1.2
Thread[ForkJoinPool-1-worker-1,5,main] exit job3,time->1693043901707
Thread[ForkJoinPool-1-worker-1,5,main] start job4,time->1693043901707
thenRun do something
Thread[ForkJoinPool-1-worker-1,5,main] exit job4,time->1693043903707
run result->null
main thread exit,time->1693043903707
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test12 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool = new ForkJoinPool();// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "job1 start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}if (true) {throw new RuntimeException("test");} else {System.out.println(Thread.currentThread() + "job1 exit,time->" + System.currentTimeMillis());return 1.2;}}, pool);//cf执行异常时,将抛出的异常作为入参传递给回调方法CompletableFuture<Double> cf2 = cf.exceptionally((param) -> {System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println("error stack trace->");param.printStackTrace();System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());return -1.1;});//cf正常执行时执行的逻辑,如果执行异常则不调用此逻辑cf2.thenAccept((param) -> {System.out.println(Thread.currentThread() + "job2 start,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println("param->" + param);System.out.println(Thread.currentThread() + "job2 exit,time->" + System.currentTimeMillis());});System.out.println("main thread start,time->" + System.currentTimeMillis());//等待子任务执行完成,此处无论是job2和job3都可以实现job2退出,主线程才退出,如果是cf,则主线程不会等待job2执行完成自动退出了//cf2.get时,没有异常,但是依然有返回值,就是cf的返回值System.out.println("run result->" + cf2.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time->1693044222992
main thread start,time->1693044222993
Thread[ForkJoinPool-1-worker-1,5,main] start,time->1693044224993
error stack trace->
Thread[ForkJoinPool-1-worker-1,5,main] exit,time->1693044226996
Thread[ForkJoinPool-1-worker-1,5,main]job2 start,time->1693044226996
java.util.concurrent.CompletionException: java.lang.RuntimeException: testat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: testat com.test.Test12.lambda$test$0(Test12.java:23)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more
param->-1.1
Thread[ForkJoinPool-1-worker-1,5,main]job2 exit,time->1693044228998
run result->-1.1
main thread exit,time->1693044228999

将上述示例中的if(true) 改成if(false),其输出如下:

# 程序输出
Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time->1693044349555
main thread start,time->1693044349556
Thread[ForkJoinPool-1-worker-1,5,main]job1 exit,time->1693044351556
Thread[ForkJoinPool-1-worker-1,5,main]job2 start,time->1693044351556
param->1.2
Thread[ForkJoinPool-1-worker-1,5,main]job2 exit,time->1693044353559
run result->1.2
main thread exit,time->1693044353559

cf2没有指定,其result就是cf执行的结果,理论上cf2.get应该立即返回的,此处是等待了cf3,即job2执行完成后

才返回。

7.2 thenAcceptBoth

thenAcceptBoth函数的作用是,当两个CompletionStage都正常完成计算的时候,就会执行提供的action

费两个异步的结果。

常用方法:

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);

thenAcceptBoth 将两个任务的执行结果作为方法入参,但是无返回值。

package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;public class ThenAcceptBothTest {public static void main(String[] args) {System.out.println("main thread start time->" + System.currentTimeMillis());CountDownLatch countDownLatch = new CountDownLatch(1);CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务1结果:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务2结果:" + number);return number;}});future1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {@Overridepublic void accept(Integer x, Integer y) {System.out.println("最终结果:" + (x + y));countDownLatch.countDown();}});try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("main thread end time->" + System.currentTimeMillis());}
}
# 程序输出
main thread start time->1693053398891
任务1结果:3
任务2结果:3
最终结果:6
main thread end time->1693053401896

7.3 thenRun

thenRun也是对线程任务结果的一种消费函数,与thenAccept不同的是,thenRun会在上一阶段

CompletableFuture计算完成的时候执行一个Runnable,而Runnable并不使用该CompletableFuture计算的

结果。

thenRun 的方法没有入参,也没有返回值。

常用方法:

public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);
package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;public class ThenRunTest {public static void main(String[] args) {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}).thenRun(() -> System.out.println("thenRun 执行"));}
}
# 程序输出
第一阶段:8
thenRun 执行

8、结果组合(thenCombine)

8.1 thenCombine

合并两个线程任务的结果,并进一步处理。

常用方法:

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);

thenCombine 会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值。

package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Supplier;public class ThenCombineTest {public static void main(String[] args) {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("任务1结果:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("任务2结果:" + number);return number;}});CompletableFuture<Integer> result = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer x, Integer y) {return x + y;}});try {Integer integer = result.get();System.out.println("结果是:" + integer);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
# 程序输出
任务1结果:0
任务2结果:4
结果是:4

thenCombine / thenAcceptBoth / runAfterBoth:

这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,

thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同

样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务

中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;public class Test13 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {ForkJoinPool pool = new ForkJoinPool();// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());return 1.2;});CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());return 3.2;});//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值CompletableFuture<Double> cf3 = cf.thenCombine(cf2, (a, b) -> {System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());System.out.println("job3 param a->" + a + ",b->" + b);try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());return a + b;});//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值CompletableFuture cf4 = cf.thenAcceptBoth(cf2, (a, b) -> {System.out.println(Thread.currentThread() + " start job4,time->" + System.currentTimeMillis());System.out.println("job4 param a->" + a + ",b->" + b);try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job4,time->" + System.currentTimeMillis());});//cf4和cf3都执行完成后,执行cf5,无入参,无返回值CompletableFuture cf5 = cf4.runAfterBoth(cf3, () -> {System.out.println(Thread.currentThread() + " start job5,time->" + System.currentTimeMillis());try {Thread.sleep(1000);} catch (InterruptedException e) {}System.out.println("cf5 do something");System.out.println(Thread.currentThread() + " exit job5,time->" + System.currentTimeMillis());});System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("cf run result->" + cf.get());System.out.println("main thread start cf5.get(),time->" + System.currentTimeMillis());System.out.println("cf5 run result->" + cf5.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time->1693053679581
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time->1693053679582
main thread start cf.get(),time->1693053679583
Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job2,time->1693053681082
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time->1693053681582
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job4,time->1693053681582
Thread[main,5,main] start job3,time->1693053681582
job3 param a->1.2,b->3.2
job4 param a->1.2,b->3.2
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job4,time->1693053683085
Thread[main,5,main] exit job3,time->1693053683585
Thread[main,5,main] start job5,time->1693053683585
cf5 do something
Thread[main,5,main] exit job5,time->1693053684586
cf run result->1.2
main thread start cf5.get(),time->1693053684586
cf5 run result->null
main thread exit,time->1693053684586

job1 和 job2几乎同时运行,job2比job1先执行完成,等job1退出后,job3和job4几乎同时开始运行,job4先退

出,等job3执行完成,job5开始了,等job5执行完成后,主线程退出。

9、任务交互(applyToEither和acceptEither和runAfterEither和anyOf和allOf和runAfterBoth)

线程交互指将两个线程任务获取结果的速度相比较,按一定的规则进行下一步处理。

9.1 applyToEither

两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。

常用方法:

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);

applyToEither 会将已经执行完成的任务的执行结果作为方法入参,并有返回值。

package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;public class ApplyToEitherTest {public static void main(String[] args) {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务1结果:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务2结果:" + number);return number;}});future1.applyToEither(future2, new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer number) {System.out.println("最快结果:" + number);return number * 2;}});try {Integer result = future1.get();System.out.println("结果是:" + result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
# 程序输出
任务2结果:4
最快结果:4
任务1结果:8
结果是:8

9.2 acceptEither

两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。

常用方法:

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);

acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值。

package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;public class AcceptEitherTest {public static void main(String[] args) {CountDownLatch countDownLatch = new CountDownLatch(1);CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});future1.acceptEither(future2, new Consumer<Integer>() {@Overridepublic void accept(Integer number) {System.out.println("最快结果:" + number);countDownLatch.countDown();}});try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}
}
# 程序输出
第二阶段:2
最快结果:2

9.3 runAfterEither

两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。

常用方法:

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);

runAfterEither没有方法入参,也没有返回值。

package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;public class RunAfterEitherTest {public static void main(String[] args) {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务1结果:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务2结果:" + number);return number;}});future1.runAfterEither(future2, new Runnable() {@Overridepublic void run() {System.out.println("已经有一个任务完成了");}}).join();}
}
# 程序输出
任务2结果:4
任务1结果:4
已经有一个任务完成了

applyToEither / acceptEither / runAfterEither:

这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务,其区别在于

applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完

成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任

务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test14 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());return 1.2;});CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());return 3.2;});//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值CompletableFuture<Double> cf3 = cf.applyToEither(cf2, (result) -> {System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());System.out.println("job3 param result->" + result);try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());return result;});//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值CompletableFuture cf4 = cf.acceptEither(cf2, (result) -> {System.out.println(Thread.currentThread() + " start job4,time->" + System.currentTimeMillis());System.out.println("job4 param result->" + result);try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job4,time->" + System.currentTimeMillis());});//cf4和cf3都执行完成后,执行cf5,无入参,无返回值CompletableFuture cf5 = cf4.runAfterEither(cf3, () -> {System.out.println(Thread.currentThread() + " start job5,time->" + System.currentTimeMillis());try {Thread.sleep(1000);} catch (InterruptedException e) {}System.out.println("cf5 do something");System.out.println(Thread.currentThread() + " exit job5,time->" + System.currentTimeMillis());});System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("cf run result->" + cf.get());System.out.println("main thread start cf5.get(),time->" + System.currentTimeMillis());System.out.println("cf5 run result->" + cf5.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}
}
# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time->1693054246333
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time->1693054246334
main thread start cf.get(),time->1693054246334
Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job2,time->1693054247835
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job4,time->1693054247835
job4 param result->3.2
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time->1693054248335
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job3,time->1693054248335
cf run result->1.2
job3 param result->1.2
main thread start cf5.get(),time->1693054248335
Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job4,time->1693054249339
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job5,time->1693054249339
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job3,time->1693054250336
cf5 do something
Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job5,time->1693054250339
cf5 run result->null
main thread exit,time->1693054250339

job1 和job2 同时开始运行,job2先执行完成,然后job4开始执行,理论上job3和job4应该同时开始运行,但是此

时只有job4开始执行了,job3是等待job1执行完成后才开始执行,job4先于job3执行完成,然后job5开始执行,

等job5执行完成后,主线程退出。

9.4 anyOf

anyOf() 的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个

CompletableFuture

常用方法:

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任

务的执行结果,如果该任务执行异常,则抛出异常。

package com;import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class AnyOfTest {public static void main(String[] args) {Random random = new Random();CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return "hello";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(1));} catch (InterruptedException e) {e.printStackTrace();}return "world";});CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);try {Object object = result.get();System.out.println(object);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
# 程序输出
world
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test15 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());return 1.2;});CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());return 3.2;});CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());try {Thread.sleep(1300);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());return 2.2;});//allof等待所有任务执行完成才执行cf4,如果有一个任务异常终止,则cf4.get时会抛出异常,都是正常执行,cf4.get返回null//anyOf是只有一个任务执行完成,无论是正常执行或者执行异常,都会执行cf4,cf4.get的结果就是已执行完成的任务的执行结果CompletableFuture cf4 = CompletableFuture.anyOf(cf, cf2, cf3).whenComplete((a, b) -> {if (b != null) {System.out.println("error stack trace->");b.printStackTrace();} else {System.out.println("run succ,result->" + a);}});System.out.println("main thread start cf4.get(),time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("cf4 run result->" + cf4.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}}
# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time->1693054996820
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time->1693054996820
Thread[ForkJoinPool.commonPool-worker-3,5,main] start job3,time->1693054996821
main thread start cf4.get(),time->1693054996821
Thread[ForkJoinPool.commonPool-worker-3,5,main] exit job3,time->1693054998122
run succ,result->2.2
cf4 run result->2.2
main thread exit,time->1693054998125

9.5 allOf

allOf方法用来实现多 CompletableFuture 的同时返回。

常用方法:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的

CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class AllOfTest {public static void main(String[] args) {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1完成!");return "future1完成!";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2完成!");return "future2完成!";});CompletableFuture<Void> completableFuture = CompletableFuture.allOf(future1, future2);try {String result = String.valueOf(completableFuture.get());System.out.println(result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
# 程序输出
future2完成!
future1完成!
future1完成!
future2完成!
null
package com.test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class Test16 {public static void main(String[] args) throws ExecutionException, InterruptedException {test();}public static void test() throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());try {Thread.sleep(2000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());return 1.2;});CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());try {Thread.sleep(1500);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());return 3.2;});CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());try {Thread.sleep(1300);} catch (InterruptedException e) {}
//            throw new RuntimeException("test");System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());return 2.2;});//allof等待所有任务执行完成才执行cf4,如果有一个任务异常终止,则cf4.get时会抛出异常,都是正常执行,cf4.get返回null//anyOf是只有一个任务执行完成,无论是正常执行或者执行异常,都会执行cf4,cf4.get的结果就是已执行完成的任务的执行结果CompletableFuture cf4 = CompletableFuture.allOf(cf, cf2, cf3).whenComplete((a, b) -> {if (b != null) {System.out.println("error stack trace->");b.printStackTrace();} else {System.out.println("run succ,result->" + a);}});System.out.println("main thread start cf4.get(),time->" + System.currentTimeMillis());//等待子任务执行完成System.out.println("cf4 run result->" + cf4.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}}
# 程序输出
Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time->1693055202550
Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time->1693055202550
Thread[ForkJoinPool.commonPool-worker-3,5,main] start job3,time->1693055202550
main thread start cf4.get(),time->1693055202551
Thread[ForkJoinPool.commonPool-worker-3,5,main] exit job3,time->1693055203852
Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job2,time->1693055204051
Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time->1693055204551
run succ,result->null
cf4 run result->null
main thread exit,time->1693055204551

主线程等待最后一个job1执行完成后退出。

9.6 join

package com;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;public class JoinTest {public static void main(String[] args) throws InterruptedException {List<Integer> numberList = new ArrayList<>();for (int i = 1; i < 11; i++) {numberList.add(i);}List<CompletableFuture<?>> futureList = new ArrayList<>();System.out.println("start!");long start = System.currentTimeMillis();for (Integer number : numberList) {futureList.add(CompletableFuture.runAsync(() -> {try {Thread.sleep(1000);System.out.println(number);} catch (InterruptedException e) {e.printStackTrace();}}));}CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])).join();long end = System.currentTimeMillis();System.out.println("end!");System.out.println("耗时:" + (end - start));}
}
# 程序输出
start!
4
1
2
3
5
6
7
8
9
10
end!
耗时:2046
package com;import java.util.concurrent.CompletableFuture;public class JoinTest2 {public static void main(String[] args) {CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {System.out.println("Task 1 started");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Task 1 completed");});CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {System.out.println("Task 2 started");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Task 2 completed");});CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {System.out.println("Task 3 started");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Task 3 completed");});CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);allFutures.thenRun(() -> {System.out.println("All tasks completed");});// 防止 JVM 在 CompletableFuture 执行完之前退出try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}
}
# 程序输出
Task 1 started
Task 2 started
Task 3 started
Task 1 completed
Task 2 completed
Task 3 completed
All tasks completed
package com;import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;public class JoinTest3 {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(10);List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, 10).mapToObj(n -> CompletableFuture.runAsync(() -> {System.out.println("done " + n);}, threadPool)).collect(Collectors.toList());futures.forEach(CompletableFuture::join);System.out.println("all done");threadPool.shutdown();}
}
# 程序输出
done 2
done 5
done 4
done 3
done 1
done 6
done 7
done 8
done 9
done 10
all done

9.7 runAfterBoth

runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。

package com;import java.util.concurrent.CompletableFuture;public class RunAfterBothTest {public static void main(String[] args) {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("线程1开始了" + Thread.currentThread().getName());int i = 100 / 10;System.out.println("线程1结束了" + Thread.currentThread().getName());return i;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("线程2开始了" + Thread.currentThread().getName());int i = 100 / 5;System.out.println("线程2结束了" + Thread.currentThread().getName());return i;});// 希望在future1 future2任务执行完之后执行future3// runAfterBothAsync不能获取前面两个线程的返回结果,本身也没有返回结果CompletableFuture<Void> voidCompletableFuture = future1.runAfterBothAsync(future2, () -> {System.out.println("线程3执行了");});}
}
# 程序输出
线程1开始了ForkJoinPool.commonPool-worker-1
线程1结束了ForkJoinPool.commonPool-worker-1
线程2开始了ForkJoinPool.commonPool-worker-2
线程2结束了ForkJoinPool.commonPool-worker-2
线程3执行了

10、CompletableFuture常用方法总结

在这里插入图片描述

11、实现最优的烧水泡茶程序

著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这

样:

在这里插入图片描述

11.1 基于Future实现

package com;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;/*** 烧茶案例*/
public class FutureTaskTest {public static void main(String[] args) throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();// 创建任务T2的FutureTaskFutureTask<String> ft2 = new FutureTask<>(new T2Task());// 创建任务T1的FutureTaskFutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));// 线程T1执行任务ft2Thread T1 = new Thread(ft2);T1.start();// 线程T2执行任务ft1Thread T2 = new Thread(ft1);T2.start();// 等待线程T1执行结果System.out.println(ft1.get());long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start));}
}// T1Task需要执行的任务
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {FutureTask<String> ft2;// T1任务需要T2任务的FutureTaskT1Task(FutureTask<String> ft2) {this.ft2 = ft2;}@Overridepublic String call() throws Exception {// 洗水壶System.out.println("T1:洗水壶...");TimeUnit.SECONDS.sleep(1);// 烧开水System.out.println("T1:烧开水...");TimeUnit.SECONDS.sleep(15);// 获取T2线程的茶叶String tf = ft2.get();System.out.println("T1:拿到茶叶:" + tf);// 泡茶System.out.println("T1:泡茶...");return "上茶:" + tf;}
}// T2Task需要执行的任务
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {// 洗茶壶System.out.println("T2:洗茶壶...");TimeUnit.SECONDS.sleep(1);// 洗茶杯System.out.println("T2:洗茶杯...");TimeUnit.SECONDS.sleep(2);// 拿茶叶System.out.println("T2:拿茶叶...");TimeUnit.SECONDS.sleep(1);return "龙井";}
}
# 程序输出
T2:洗茶壶...
T1:洗水壶...
T2:洗茶杯...
T1:烧开水...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井
耗时:16003

11.2 基于CompletableFuture实现

package com;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureTest {public static void main(String[] args) {long start = System.currentTimeMillis();// 任务1:洗水壶->烧开水CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {// 洗水壶System.out.println("T1:洗水壶...");sleep(1, TimeUnit.SECONDS);// 烧开水System.out.println("T1:烧开水...");sleep(15, TimeUnit.SECONDS);});// 任务2:洗茶壶->洗茶杯->拿茶叶CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {// 洗茶壶System.out.println("T2:洗茶壶...");sleep(1, TimeUnit.SECONDS);// 洗茶杯System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);// 拿茶叶System.out.println("T2:拿茶叶...");sleep(1, TimeUnit.SECONDS);return "龙井";});// 任务3:任务1和任务2完成后执行:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (a, b) -> {System.out.println("T1:拿到茶叶:" + b);System.out.println("T1:泡茶...");return "上茶:" + b;});//等待任务3执行结果System.out.println(f3.join());long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start));}static void sleep(int t, TimeUnit u) {try {u.sleep(t);} catch (InterruptedException e) {}}}

结果:

# 程序输出
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井
耗时:16055

相关文章:

JAVA中使用CompletableFuture进行异步编程

JAVA中使用CompletableFuture进行异步编程 1、什么是CompletableFuture CompletableFuture 是 JDK8 提供的 Future 增强类&#xff0c;CompletableFuture 异步任务执行线程池&#xff0c;默认是把异步任 务都放在 ForkJoinPool 中执行。 在这种方式中&#xff0c;主线程不会…...

uniapp:配置动态接口域名,根据图片访问速度,选择最快的接口

common.js // 动态测速选择的域名 // h5直接返回默认第一个域名 // vue文件用到域名的话用this.$baseURL let domains [{uri:192.168.31.215:9523, speed:0},{uri:api.ceshi.org, speed:0}, ]export const protocol {api: http://,//本地// api: https://api.,//正式h5Url: h…...

Lambda表达式常见用法(提高效率神器)

Java8中一个非常重要的特性就是Lambda表达式&#xff0c;我们可以把它看成是一种闭包&#xff0c;它允许把函数当做参数来使用&#xff0c;是面向函数式编程的思想&#xff0c;一定程度上可以使代码看起来更加简洁。 其实以上都不重要&#xff0c;重要的是能够提高我的开发效率…...

2023旷视自驾感知算法暑期实习一面

来源&#xff1a;投稿 作者&#xff1a;LSC 编辑&#xff1a;学姐 1. 问下项目&#xff0c;问下我的情况 2. 是否了解最新的BEV算法&#xff0c;讲一下 3. 是否了解三维重建 4. 考察相机坐标系的转换 5. 手撕代码&#xff0c;翻车了&#xff0c;不考leetcode&#xff0c;考…...

Python3 如何实现 websocket 服务?

Python 实现 websocket 服务很简单&#xff0c;有很多的三方包可以用&#xff0c;我从网上大概找到三种常用的包&#xff1a;websocket、websockets、Flask-Sockets。 但这些包很多都“年久失修”&#xff0c; 比如 websocket 在 2010 年就不维护了。 而 Flask-Sockets 也在 2…...

SQLAlchemy常用数据类型

目录 SQLAlchemy常用数据类型 代码演示 代码分析 SQLAlchemy常用数据类型 SQLAlchemy 是一个Python的SQL工具库和对象关系映射(ORM)工具&#xff0c;它提供了一种在Python中操作数据库的高效方式。下面是SQLAlchemy中常用的一些数据类型&#xff1a; Integer&#xff1a;整形&…...

Vue路由与nodejs下载安装及环境变量的配置

目录 前言 一、Vue路由 1.路由简介 是什么 作用 应用场景 2.SPA简介 SPA是什么 SPA的优点 注意事项 3.路由实现思路 1.引入路由的js依赖 2.定义组件 3.定义组件与路径的对应关系 4.通过路由关系获取路由对象router 5.将路由对象挂载到实例中 6.触发路由事…...

HarmonyOS之 应用程序页面UIAbility

一 UIAbility介绍&#xff1a; 1.1 UIAbility是一种包含用户界面的应用组件&#xff0c;主要用于和用户进行交互 1.2 UIAbility也是系统调度的单元&#xff0c;为应用提供窗口在其中绘制界面 二 UIAbility跳转和传参 2.1 页面间的导航可以通过页面路由router模块来实现。页…...

数据集笔记: Porto

数据来源&#xff1a;Taxi Trajectory Data_数据集-阿里云天池 (aliyun.com) 1 数据介绍 葡萄牙波尔图市运行的所有442辆出租车的全年轨迹&#xff08;从2013年7月1日至2014年6月30日&#xff09; 2 读取数据 import pandas as pdtrapd.read_csv(C:/Users/16000/Download…...

修改vscode底部栏背景和字体颜色

修改vscode底部栏背景和字体颜色 如图&#xff1a; 首先打开齿轮&#xff0c;打开设置搜索workbench.colorCustomizations,然后点击编辑setting.json修改setting.json内内容 "workbench.colorCustomizations": {"statusBar.foreground": "#FFFFFF…...

加速企业AI实施:成功策略和效率方法

文章目录 写在前面面临的挑战MlOps简介好书推荐 写作末尾 写在前面 作为计算机科学领域的一个关键分支&#xff0c;机器学习在当今人工智能领域中占据着至关重要的地位&#xff0c;广受瞩目。机器学习通过深入分析大规模数据并总结其中的规律&#xff0c;为我们提供了解决许多…...

【图论C++】树的重心——教父POJ 3107(链式前向星的使用)

》》》算法竞赛 /*** file * author jUicE_g2R(qq:3406291309)————彬(bin-必应)* 一个某双流一大学通信与信息专业大二在读 * * brief 一直在竞赛算法学习的路上* * copyright 2023.9* COPYRIGHT 原创技术笔记&#xff1a;转载…...

hhh百度地铁广告太搞笑了;24家国内大模型公司面经;LLM法律应用实践;AI+教育产品图谱与工作流 | ShowMeAI日报

&#x1f440;日报&周刊合集 | &#x1f3a1;生产力工具与行业应用大全 | &#x1f9e1; 点赞关注评论拜托啦&#xff01; &#x1f525; 会玩儿&#xff01;承包地铁专列&#xff0c;真人移动广告 | 百度世界大会预热 百度也是会玩儿&#xff01;承包了北京地铁一号线的「…...

项目管理:项目经理一定要避开这四大误区

项目经理要保质保量按时达成项目目标&#xff0c;需要关注项目的方方面面&#xff0c;要具有很强的沟通协调能力和目标意识。但是项目经理也不免不了失误&#xff0c;管理中的这四大误区&#xff0c;你经历过几个&#xff1f; 误区一&#xff1a;做不该做的事 你是否遇到这种…...

爬虫为什么需要 HTTP 代理 IP?

前言 爬虫在互联网数据采集、分析和挖掘中扮演着至关重要的角色&#xff0c;但是对于目标网站而言&#xff0c;频繁的爬虫请求可能会对其服务器产生不小的负担&#xff0c;严重的情况甚至会导致网站崩溃或者访问受限。为了避免这种情况的发生&#xff0c;同时也为了保护客户端…...

leetcode刷题笔记/代码随想录笔记——移除字符串中多余空格

1. 使用erase()函数 void removeExtraSpaces(string& s) {for (int i s.size() - 1; i > 0; i--) {if (s[i] s[i - 1] && s[i] ) {s.erase(s.begin() i);}}// 删除字符串最后面的空格if (s.size() > 0 && s[s.size() - 1] ) {s.erase(s.begi…...

dataGrip导出导入的方式

导出&#xff1a;选中需要导出的表 导入&#xff1a;选中导出的sql文件...

LeetCode279. 完全平方数

279. 完全平方数 文章目录 [279. 完全平方数](https://leetcode.cn/problems/perfect-squares/)一、题目二、题解方法一&#xff1a;完全背包二维数组方法二&#xff1a;一维数组&#xff08;空间复杂度更小的改进版本,最下面的两个版本不需要存储完全平方数&#xff09; 一、题…...

【CMake】add_dependencies 命令

【CMake】add_dependencies 原文链接&#xff1a;https://blog.csdn.net/new9232/article/details/125831009 参考链接&#xff1a;https://blog.csdn.net/new9232/article/details/121374943 简介 add_dependencies(<target> [<target-dependency>]...)官方文档…...

go语言unsafe.Pointer与uintptr

以下内容来源go语言圣经 1、unsafe.Pointer&#xff0c;相当于c语言中的void *类型的指针&#xff0c;如果需要运算需要转成uintptr类型的指针 2. uintptr uintptr是一个无符号的整型&#xff0c;它可以保存一个指针地址。 它可以进行指针运算。 uintptr无法持有对象, GC不把…...

ddos打到高防cdn上会发生什么

ddos打到cdn上会发生什么?当DDoS攻击打到CDN上时&#xff0c;肯定会影响网站的可用性和用户体验。具体DDoS攻击打到CDN上时&#xff0c;会发生以下情况&#xff1a; CDN节点负载增加&#xff1a;DDoS攻击会导致大量的无效流量涌入CDN节点&#xff0c;从而使得节点负载增加。这…...

【单调栈】503. 下一个更大元素 II

503. 下一个更大元素 II 解题思路 参考496. 下一个更大元素 I 首先计算nums2的每一个元素的下一个比他大的元素&#xff0c;使用单调栈 将上面的结果和nums2中的每一个元素组成映射map 针对每一个Nums1的元素 查询map 记录map 的value 但是这个是循环的数组元素 class So…...

C++ decltype类型

文章目录 1. 工作原理2. decltype 变量3. decltype 表达式4. decltype 函数 1. 工作原理 随着程序越来越复杂&#xff0c;程序中用到的类型也越来越多&#xff0c;我们有时候不得不去翻阅大量上下文去寻找此数据的类型。   decltype就是一种类型说明符&#xff0c;它的出现…...

【题解】JZOJ3854 分组

JZOJ 3854 题意 有 n n n 个人&#xff0c;每个人有地位 r i r_i ri​ 和年龄 a i a_i ai​&#xff0c;对于一个若干人组成的小组&#xff0c;定义其队长为地位最高的成员&#xff08;若相等则取二者均可&#xff09;&#xff0c;其他成员的年龄与队长的差不能超过 k k …...

区块链实验室(26) - 区块链期刊Blockchain: Research and Applications

Elsevier出版物“Blockchain: Research and Applications”是浙江大学编审的期刊。该期刊自2020年创刊&#xff0c;并出版第1卷。每年出版4期&#xff0c;最新期是第4卷第3期(2023年9月)。 目前没有官方的IF&#xff0c;Elsevier的引用因子Citescore是6.4。 虽然是新刊&#xf…...

【学习笔记】[ARC153F] Tri-Colored Paths

假设三种颜色的边都存在&#xff0c;并且不存在这样的路径 首先观察到&#xff0c;对于一个简单环上的边&#xff0c;颜色一定相同 因此&#xff0c;考虑建立圆方树&#xff0c;问题转化为圆方树上的 D P DP DP问题。限制是对于方点所连接的边&#xff0c;必须涂上相同的颜色…...

基于SSM的实习管理系统

基于SSM的实习管理系统、前后端分离 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringSpringMVCMyBatisVue工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 管理员界面 教师 学生 研究背景 基于SSM的实习管理系统是一个基于Spring、Spring…...

在Vue中通过ElementUI构建前端页面【登录,注册】,在IEDA构建后端实现前后端分离

一.ElementUI组件入门 1.对于ElementUI的理解 是一套基于 Vue.js 的开源UI组件库&#xff0c;提供了丰富的可复用组件&#xff0c;可以帮助开发者快速构建美观、易用的前端界面 2.Element UI 的特点和优势 多样化的组件&#xff1a;Element UI 提供了众多常用的基础组件&#…...

TX2 open ttyTHS2

TX2 open ttyTHS2 #冷风那个吹# 于 2019-04-01 14:10:43 发布 1749 收藏 6 分类专栏: 平时问题积累 TX2 版权 平时问题积累 同时被 2 个专栏收录 22 篇文章0 订阅 订阅专栏 TX2 30 篇文章8 订阅 订阅专栏 TX2上有5个串口,但是ttyTHS1是调试串口,ttyTHS3是蓝牙,ttyTHS…...

conan入门(二十八):解决conan 1.60.0下 arch64-linux-gnu交叉编译openssl/3.1.2报错问题

上一篇博客《conan入门(二十七):因profile [env]字段废弃导致的boost/1.81.0 在aarch64-linux-gnu下交叉编译失败》解决了conan 1.60.0交叉编译boost/1.80.1的问题后&#xff0c;我继续交叉编译openssl/3.1.2时又报错了 conan install openssl/3.1.2 -pr:h aarch64-linux-gnu.…...

网站模板的缺点/免费网页设计制作网站

ShareSDK&#xff1b; 友盟&#xff1b; 百度分享&#xff1b; //支付宝支付 1&#xff0c;seller id&#xff1a; 2&#xff0c;partner id: 3&#xff0c;加密文件(公钥、私钥) 4&#xff0c;下载SDK&#xff08;网页版、无线版--支付宝论坛&#xff09; 5&#xff0c;scheme…...

上海装修公司哪家最好/seo排名第一的企业

题目&#xff1a;原题链接&#xff08;中等&#xff09; 标签&#xff1a;数组、哈希表 解法时间复杂度空间复杂度执行用时Ans 1 (Python)O(N)O(N)O(N)L(N)L(N)L(N)160ms (87.84%)Ans 2 (Python)Ans 3 (Python) 解法一&#xff1a; class Solution:def findLongestSubarray(…...

网页设计作业样例/seo基础教程

光敏电阻器&#xff08;photoresistor&#xff09;又叫光感电阻&#xff0c;它的电阻值会根据光的强度变化而变化。 现在我要把它接入树莓派Pico&#xff0c;观察电阻值随光强度的变化情况&#xff0c;本试验参考的文章&#xff1a;https://peppe8o.com/how-to-use-a-photoresi…...

大学网站建设目标/百度联盟推广

ffmpeg.exe -i F:\闪客之家\闪客之歌.mp3 -ab 56 -ar 22050 -b 500 -r 15 -s 320x240 f:\11.flv ffmpeg -i F:\01.wmv -ab 56 -ar 22050 -b 500 -r 15 -s 320x240 f:\test.flv 使用-ss参数 作用&#xff08;time_off set the start time offset&#xff09;&#xff0c;可以从指…...

寻找网站建设 网站外包/嘉兴网站建设

第一种方法&#xff1a; 1、详细查询命令&#xff1a; 查看cpu最大进程&#xff0c;或者内存最大进程。 #CPU ps aux|head -1;ps aux|grep -v PID|sort -rn -k 3|head #内存 ps aux|head -1;ps aux|grep -v PID|sort -rn -k 4|head显示如下&#xff1a; ubuntuubuntu:~$ ps…...

汽车网址导航大全/网络优化工程师

摘要&#xff1a;由汽车之家实时计算平台负责人邸星星在 4 月 17 日上海站 Meetup 分享的&#xff0c;基于 Flink Iceberg 的湖仓一体架构实践&#xff0c;内容包括&#xff1a;数据仓库架构升级的背景基于 Iceberg 的湖仓一体架构实践总结与收益后续规划Tips&#xff1a;点击…...