当前位置: 首页 > news >正文

【日常业务开发】Java实现异步编程

【日常业务开发】Java实现异步编程

  • Java实现异步编程
  • 什么是异步
  • 异步的八种实现方式
  • 异步编程
    • 线程异步
    • Future异步
    • CompletableFuture实现异步
    • Spring的@Async异步
    • Spring ApplicationEvent事件实现异步
    • 消息队列
    • ThreadUtil异步工具类
    • Guava异步
  • CompletableFuture异步编排工具类
    • 创建异步对象
      • runAsync和 supplyAsync
    • 计算完成时回调方法
    • handle 方法
    • 线程串行化方法
      • thenApply
      • thenAccept
      • thenRun
      • thenCompose 方法
    • 两任务组合 - 都要完成
      • thenCombine 合并任务
      • thenAcceptBoth
      • runAfterBoth
    • 两任务组合 - 一个完成
      • applyToEither 方法
      • acceptEither 方法
      • runAfterEither 方法
    • 多任务组合
    • 实际业务场景

Java实现异步编程

异步执行对于开发者来说并不陌生,在实际的开发过程中,很多场景多会使用到异步,相比同步执行,异步可以大大缩短请求链路耗时时间,比如:发送短信、邮件、异步更新等,这些都是典型的可以通过异步实现的场景。

什么是异步

首先我们先看一个常见的用户下单的场景:
在这里插入图片描述

在同步操作中,我们执行到 发送短信 的时候,我们必须等待这个方法彻底执行完才能执行 赠送积分 这个操作,如果 赠送积分 这个动作执行时间较长,发送短信需要等待,这就是典型的同步场景。

实际上,发送短信和赠送积分没有任何的依赖关系,通过异步,我们可以实现赠送积分发送短信这两个操作能够同时进行,比如:

在这里插入图片描述

异步的八种实现方式

  1. 线程Thread
  2. Future
  3. 异步框架CompletableFuture
  4. Spring注解@Async
  5. Spring ApplicationEvent事件
  6. 消息队列
  7. 第三方异步框架,比如Hutool的ThreadUtil
  8. Guava异步

异步编程

线程异步

public class AsyncThread extends Thread {@Overridepublic void run() {System.out.println("Current thread name:" + Thread.currentThread().getName() + " Send email success!");}public static void main(String[] args) {AsyncThread asyncThread = new AsyncThread();asyncThread.run();}
}

当然如果每次都创建一个Thread线程,频繁的创建、销毁,浪费系统资源,我们可以采用线程池:

private ExecutorService executorService = Executors.newCachedThreadPool();public void fun() {executorService.submit(new Runnable() {@Overridepublic void run() {log.info("执行业务逻辑...");}});
}

可以将业务逻辑封装到RunnableCallable中,交由线程池来执行。

Future异步

@Slf4j
public class FutureManager {public String execute() throws Exception {ExecutorService executor = Executors.newFixedThreadPool(1);Future<String> future = executor.submit(new Callable<String>() {@Overridepublic String call() throws Exception {System.out.println(" --- task start --- ");Thread.sleep(3000);System.out.println(" --- task finish ---");return "this is future execute final result!!!";}});//这里需要返回值时会阻塞主线程String result = future.get();log.info("Future get result: {}", result);return result;}@SneakyThrowspublic static void main(String[] args) {FutureManager manager = new FutureManager();manager.execute();}
}

Future的不足之处的包括以下几点:

  1. 无法被动接收异步任务的计算结果:虽然我们可以主动将异步任务提交给线程池中的线程来执行,但是待异步任务执行结束之后,主线程无法得到任务完成与否的通知,它需要通过get方法主动获取任务执行的结果。
  2. Future件彼此孤立:有时某一个耗时很长的异步任务执行结束之后,你想利用它返回的结果再做进一步的运算,该运算也会是一个异步任务,两者之间的关系需要程序开发人员手动进行绑定赋予,Future并不能将其形成一个任务流(pipeline),每一个Future都是彼此之间都是孤立的,所以才有了后面的CompletableFuture,CompletableFuture就可以将多个Future串联起来形成任务流。
  3. Futrue没有很好的错误处理机制:截止目前,如果某个异步任务在执行发的过程中发生了异常,调用者无法被动感知,必须通过捕获get方法的异常才知晓异步任务执行是否出现了错误,从而在做进一步的判断处理。

CompletableFuture实现异步

public class CompletableFutureCompose {/*** thenAccept子任务和父任务公用同一个线程*/@SneakyThrowspublic static void thenRunAsync() {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {System.out.println(Thread.currentThread() + " cf2 do something...");});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());}public static void main(String[] args) {thenRunAsync();}
}

我们不需要显式使用ExecutorService,CompletableFuture 内部使用了ForkJoinPool来处理异步任务,如果在某些业务场景我们想自定义自己的异步线程池也是可以的。

Spring的@Async异步

自定义异步线程池

/*** 线程池参数配置,多个线程池实现线程池隔离,@Async注解,默认使用系统自定义线程池,可在项目中设置多个线程池,在异步调用的时候,指明需要调用的线程池名称,比如:@Async("taskName")***/
@EnableAsync
@Configuration
public class TaskPoolConfig {/*** 自定义线程池** @author: jacklin* @since: 2021/11/16 17:41**/@Bean("taskExecutor")public Executor taskExecutor() {//返回可用处理器的Java虚拟机的数量 12int i = Runtime.getRuntime().availableProcessors();System.out.println("系统最大线程数  : " + i);ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//核心线程池大小executor.setCorePoolSize(16);//最大线程数executor.setMaxPoolSize(20);//配置队列容量,默认值为Integer.MAX_VALUEexecutor.setQueueCapacity(99999);//活跃时间executor.setKeepAliveSeconds(60);//线程名字前缀executor.setThreadNamePrefix("asyncServiceExecutor -");//设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行executor.setAwaitTerminationSeconds(60);//等待所有的任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);return executor;}
}
public interface AsyncService {MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);MessageResult sendEmail(String email, String subject, String content);
}@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {@Autowiredprivate IMessageHandler mesageHandler;@Override@Async("taskExecutor")public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content) {try {Thread.sleep(1000);mesageHandler.sendSms(callPrefix, mobile, actionType, content);} catch (Exception e) {log.error("发送短信异常 -> ", e)}}@Override@Async("taskExecutor")public sendEmail(String email, String subject, String content) {try {Thread.sleep(1000);mesageHandler.sendsendEmail(email, subject, content);} catch (Exception e) {log.error("发送email异常 -> ", e)}}
}

在实际项目中, 使用@Async调用线程池,推荐等方式是是使用自定义线程池的模式,不推荐直接使用@Async直接实现异步。

Spring ApplicationEvent事件实现异步

定义事件

public class AsyncSendEmailEvent extends ApplicationEvent {/*** 邮箱**/private String email;/*** 主题**/private String subject;/*** 内容**/private String content;/*** 接收者**/private String targetUserId;}

定义事件处理器

@Slf4j
@Component
public class AsyncSendEmailEventHandler implements ApplicationListener<AsyncSendEmailEvent> {@Autowiredprivate IMessageHandler mesageHandler;@Async("taskExecutor")@Overridepublic void onApplicationEvent(AsyncSendEmailEvent event) {if (event == null) {return;}String email = event.getEmail();String subject = event.getSubject();String content = event.getContent();String targetUserId = event.getTargetUserId();mesageHandler.sendsendEmailSms(email, subject, content, targerUserId);}
}

另外,可能有些时候采用ApplicationEvent实现异步的使用,当程序出现异常错误的时候,需要考虑补偿机制,那么这时候可以结合Spring Retry重试来帮助我们避免这种异常造成数据不一致问题。

消息队列

生产者

@Slf4j
@SpringBootTest
public class ProducerRocketMqBootApiTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送的是同步消息* rocketMQTemplate.syncSend()* rocketMQTemplate.send()* rocketMQTemplate.convertAndSend()* 这三种发送消息的方法,底层都是调用syncSend*//*** 测试发送简单的消息** @throws Exception*/@Testpublic void testSimpleMsg() {SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.TOPIC_TAG, "我是一个同步简单消息");System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());}
}

消费者

@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}")
public class BaseConsumerListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {@Autowiredprivate MessageMapper messageMapper;//    @Autowired
//    private BitMapBloomFilter bitMapBloomFilter;@Autowiredprivate ApplicationContext applicationContext;@Overridepublic void onMessage(MessageExt message) {String topic = message.getTopic();String tag = message.getTags();byte[] body = message.getBody();String keys = message.getKeys();String msgId = message.getMsgId();String realTopic = message.getProperty("REAL_TOPIC");String originMessageId = message.getProperty("ORIGIN_MESSAGE_ID");// 获取重试的次数 失败一次消息中的失败次数会累加一次int reconsumeTimes = message.getReconsumeTimes();String jsonBody = JackJsonUtil.toJSONString((new String(body)));log.info("消息监听类: msgId:{},topic:{}, tag:{}, body:{},keys:{},realTopic:{},originMessageId:{},reconsumeTimes:{}", msgId, topic, tag, jsonBody, keys, realTopic, originMessageId, reconsumeTimes);// 布隆过滤器进行去重
//        if (bitMapBloomFilter.contains(keys)) {
//            return;
//        }
//        bitMapBloomFilter.add(keys);// 消费者幂等处理: 设计去重表,防止重复消费messageMapper.insert(buildMessage(message));applicationContext.publishEvent(new BaseEvent(tag, jsonBody));}private Message buildMessage(MessageExt messageExt) {Message message = new Message();message.setMsgId(messageExt.getMsgId());message.setMsgTopic(messageExt.getTopic());message.setMsgTag(messageExt.getTags());message.setMsgBody(JackJsonUtil.toJSONString((new String(messageExt.getBody()))));// 判断是否是重试消息String realTopic = messageExt.getProperty("REAL_TOPIC");String originMessageId = messageExt.getProperty("ORIGIN_MESSAGE_ID");if (StrUtil.isNotBlank(realTopic) && StrUtil.isNotBlank(originMessageId) ) {message.setMsgType("2");message.setMsgKeys(messageExt.getKeys()+":"+originMessageId+":"+IdUtil.fastUUID());} else {message.setMsgType("1");message.setMsgKeys(messageExt.getKeys());}message.setMsgRetryId(originMessageId);message.setMsgRetryTopic(realTopic);message.setCreateTime(new Date());return message;}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 设置最大重试次数consumer.setMaxReconsumeTimes(3);// 如下,设置其它consumer相关属性consumer.setPullBatchSize(16);}
}

ThreadUtil异步工具类

@Slf4j
public class ThreadUtils {public static void main(String[] args) {for (int i = 0; i < 3; i++) {ThreadUtil.execAsync(() -> {ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();int number = threadLocalRandom.nextInt(20) + 1;System.out.println(number);});log.info("当前第:" + i + "个线程");}log.info("task finish!");}
}

Guava异步

GuavaListenableFuture顾名思义就是可以监听的Future,是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用Guava ListenableFuture可以帮我们检测Future是否完成了,不需要再通过get()方法苦苦等待异步的计算结果,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。

ListenableFuture是一个接口,它从jdkFuture接口继承,添加了void addListener(Runnable listener, Executor executor)方法。

我们看下如何使用ListenableFuture。首先需要定义ListenableFuture的实例:

 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.info("callable execute...")TimeUnit.SECONDS.sleep(1);return 1;}});

首先通过MoreExecutors类的静态方法listeningDecorator方法初始化一个ListeningExecutorService的方法,然后使用此实例的submit方法即可初始化ListenableFuture对象。

ListenableFuture要做的工作,在Callable接口的实现类中定义,这里只是休眠了1秒钟然后返回一个数字1,有了ListenableFuture实例,可以执行此Future并执行Future完成之后的回调函数。

 Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {@Overridepublic void onSuccess(Integer result) {//成功执行...System.out.println("Get listenable future's result with callback " + result);}@Overridepublic void onFailure(Throwable t) {//异常情况处理...t.printStackTrace();}
});

CompletableFuture异步编排工具类

在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。

创建异步对象

runAsync和 supplyAsync

CompletableFuture 提供了四个静态方法来创建一个异步操作。

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  • runAsync方法不支持返回值。
  • supplyAsync可以支持返回值。

计算完成时回调方法

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情

whenCompletewhenCompleteAsync 的区别:

  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
  • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

public static void whenComplete() throws Exception {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}if(new Random().nextInt()%2>=0) {int i = 12/0;}System.out.println("run end ...");});future.whenComplete(new BiConsumer<Void, Throwable>() {@Overridepublic void accept(Void t, Throwable action) {System.out.println("执行完成!");}});future.exceptionally(new Function<Throwable, Void>() {@Overridepublic Void apply(Throwable t) {System.out.println("执行失败!"+t.getMessage());return null;}});TimeUnit.SECONDS.sleep(2);
}

handle 方法

handle 是执行任务完成时对结果的处理。

handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
public static void handle() throws Exception{CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int i= 10/0;return new Random().nextInt(10);}}).handle(new BiFunction<Integer, Throwable, Integer>() {@Overridepublic Integer apply(Integer param, Throwable throwable) {int result = -1;if(throwable==null){result = param * 2;}else{System.out.println(throwable.getMessage());}return result;}});System.out.println(future.get());
}

线程串行化方法

thenApply

当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

Function

  • T:上一个任务返回结果的类型
  • U:当前任务的返回值类型
private static void thenApply() throws Exception {CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {@Overridepublic Long get() {long result = new Random().nextInt(100);System.out.println("result1="+result);return result;}}).thenApply(new Function<Long, Long>() {@Overridepublic Long apply(Long t) {long result = t*5;System.out.println("result2="+result);return result;}});long result = future.get();System.out.println(result);
}

thenAccept

消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public static void thenAccept() throws Exception{CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {return new Random().nextInt(10);}}).thenAccept(integer -> {System.out.println(integer);});future.get();
}

thenRun

只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作

public static void thenRun() throws Exception{CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {return new Random().nextInt(10);}}).thenRun(() -> {System.out.println("thenRun ...");});future.get();
}

thenCompose 方法

thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

private static void thenCompose() throws Exception {CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);System.out.println("t1="+t);return t;}}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {@Overridepublic CompletionStage<Integer> apply(Integer param) {return CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = param *2;System.out.println("t2="+t);return t;}});}});System.out.println("thenCompose result : "+f.get());
}

两任务组合 - 都要完成

thenCombine 合并任务

组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值

private static void thenCombine() throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "hello";}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "hello";}});CompletableFuture<String> result = future1.thenCombine(future2, new BiFunction<String, String, String>() {@Overridepublic String apply(String t, String u) {return t+" "+u;}});System.out.println(result.get());
}

thenAcceptBoth

组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值

private static void thenAcceptBoth() throws Exception {CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);try {TimeUnit.SECONDS.sleep(t);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("f1="+t);return t;}});CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);try {TimeUnit.SECONDS.sleep(t);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("f2="+t);return t;}});f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() {@Overridepublic void accept(Integer t, Integer u) {System.out.println("f1="+t+";f2="+u+";");}});
}

runAfterBoth

组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务

private static void runAfterBoth() throws Exception {CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);try {TimeUnit.SECONDS.sleep(t);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("f1="+t);return t;}});CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);try {TimeUnit.SECONDS.sleep(t);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("f2="+t);return t;}});f1.runAfterBoth(f2, new Runnable() {@Overridepublic void run() {System.out.println("上面两个任务都执行完成了。");}});
}

两任务组合 - 一个完成

applyToEither 方法

两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

private static void applyToEither() throws Exception {CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);try {TimeUnit.SECONDS.sleep(t);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("f1="+t);return t;}});CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);try {TimeUnit.SECONDS.sleep(t);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("f2="+t);return t;}});CompletableFuture<Integer> result = f1.applyToEither(f2, new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer t) {System.out.println(t);return t * 2;}});System.out.println(result.get());
}

acceptEither 方法

两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

private static void acceptEither() throws Exception {CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);try {TimeUnit.SECONDS.sleep(t);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("f1="+t);return t;}});CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);try {TimeUnit.SECONDS.sleep(t);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("f2="+t);return t;}});f1.acceptEither(f2, new Consumer<Integer>() {@Overridepublic void accept(Integer t) {System.out.println(t);}});
}

runAfterEither 方法

两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值

private static void runAfterEither() throws Exception {CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);try {TimeUnit.SECONDS.sleep(t);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("f1="+t);return t;}});CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int t = new Random().nextInt(3);try {TimeUnit.SECONDS.sleep(t);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("f2="+t);return t;}});f1.runAfterEither(f2, new Runnable() {@Overridepublic void run() {System.out.println("上面有一个已经完成了。");}});
}

多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

allOf:等待所有任务完成
anyOf:只要有一个任务完成

实际业务场景

在这里插入图片描述

假设要求:

第一,要先拿到商品的基本信息 基本信息里面有 销售id 和 销售规格id

第二,拿到商品基本信息后 可以 根据商品基本信息 异步去获取 促销、销售属性、规格参数等信息

第三,图片信息和商品基本没有上下关联关系 可以同时异步获取

第四,所以信息全部查询完成后一起返回

@Test
public void test16() throws ExecutionException, InterruptedException {//所有信息的汇总SkuiVo skuiVo = new SkuiVo();//查询基本信息(带返回值的异步)CompletableFuture<Skuinfo> infoFuture = CompletableFuture.supplyAsync(() -> {//假设查询到了商品基本信息Skuinfo skuinfo = new Skuinfo();skuinfo.setSpuId("1");skuinfo.setSpuId("2");skuinfo.setGuiId("3");skuiVo.setSkuinfo(skuinfo);return skuinfo;},executor);//查到基本信息后 异步同时去查 促销信息 规格信息 销售属性信息//拿到查基本信息任务的返回值 任务本身无需返回值CompletableFuture<Void> saleCxFuture = infoFuture.thenAcceptAsync((res) -> {String spuId = res.getSpuId();//拿到商品的销售id后查促销信息skuiVo.setSaleCx("促销信息");}, executor);CompletableFuture<Void> saleSxFuture = infoFuture.thenAcceptAsync((res) -> {String spuId = res.getSpuId();//拿到商品的销售id后查销售属性skuiVo.setSaleSx("销售属性信息");}, executor);CompletableFuture<Void> saleGgFuture = infoFuture.thenAcceptAsync((res) -> {String spuId = res.getSpuId();String guiId = res.getGuiId();//拿到商品的销售id和规格id 查商品规格信息skuiVo.setSaleGg("商品规格信息");}, executor);//查基本信息的时候 同时异步 也根据商品id 查商品图片信息//这个任务不需要返回值CompletableFuture<Void> imageFuture= CompletableFuture.runAsync(() -> {//查商品图片信息skuiVo.setImages("商品图片信息");}, executor);//等待所有任务都完成CompletableFuture.allOf(saleCxFuture,saleSxFuture,saleGgFuture,imageFuture).get();System.out.println(skuiVo);
}

相关文章:

【日常业务开发】Java实现异步编程

【日常业务开发】Java实现异步编程 Java实现异步编程什么是异步异步的八种实现方式异步编程线程异步Future异步CompletableFuture实现异步Spring的Async异步Spring ApplicationEvent事件实现异步消息队列ThreadUtil异步工具类Guava异步 CompletableFuture异步编排工具类创建异步…...

学习笔记|模数转换器|ADC原理|STC32G单片机视频开发教程(冲哥)|第十七集:ADC采集

文章目录 1.模数转换器&#xff08;ADC&#xff09;是什么&#xff1f;手册说明&#xff1a; 2.STC32G单片机ADC使用原理19.1.1 ADC控制寄存器&#xff08;ADC_CONTR)19.1.2 ADC配置寄存器&#xff08;ADCCFG)19.1.4ADC时序控制寄存器&#xff08;ADCTIM&#xff09;19.3 ADC相…...

OpenCV实现“蓝线挑战“特效

原理 算法原理可以分为三个流程&#xff1a; 1、将视频&#xff08;图像&#xff09;从&#xff08;顶->底&#xff09;或&#xff08;左->右&#xff09;逐行&#xff08;列&#xff09;扫描图像。 2、将扫描完成的行&#xff08;列&#xff09;像素重新生成定格图像…...

容器管理工具 Docker生态架构及部署

目录 一、Docker生态架构 1.1 Docker Containers Are Everywhere 1.2 生态架构 1.2.1 Docker Host 1.2.2 Docker daemon 1.2.3 Registry 1.2.4 Docker client 1.2.5 Image 1.2.6 Container 1.2.7 Docker Dashboard 1.3 Docker版本 二、Docker部署 2.1 使用YUM源部署…...

js判断数据类型的方法

简单数据类型用&#xff1a;typeof&#xff0c; // 可以直接typeof空格接数据的方式,也可以typeof(数据)的方式使用 console.log(typeof ""); //string(检验字符串没问题) console.log(typeof 1); //number(检验数字没问题) console.log(typ…...

达梦数据库随系统开机自动启动脚本

写一个脚本&#xff0c;实现在服务器开机后自动启动达梦数据库的功能。 1. 在/etc/init.d/目录下&#xff0c;编写脚本&#xff0c;并将脚本命名为startdm.sh。脚本内容实现如下&#xff1a; #!/bin/bash #chkconfig:2345 80 90 #decription:启动达梦# 切换到 dmdba 用户 su …...

Python开发利器之VS Code

Python官方提供了一个Python集成开发环境&#xff08;IDE&#xff09;&#xff1a; IDLE (Integrated Development and Learning Environment)。 它提供了一个图形用户界面&#xff0c;可以让开发者编写、调试和执行Python程序。IDLE包含Python解释器、代码编辑器、调试器和文件…...

【Axure视频教程】输入框控制滑动评分条

今天教大家在Axure里如何制作输入框控制滑动评分条的原型模板&#xff0c;可以通过鼠标左右拖动滑块&#xff0c;也可以点击条形让滑块移动到指定位置&#xff0c;标签和输入框里会返回具体的分值&#xff0c;分值由滑块所在的位置动态计算而成&#xff1b;也可以在输入框里输入…...

【学习笔记】[AGC064C] Erase and Divide Game

有点难&#x1f605;&#xff0c;看到比自己低一级的选手场切这道题就更绷不住了&#x1f607; 考虑 从低到高位 建立 trie \text{trie} trie 树&#xff0c;但是因为是对反串建立的&#xff0c;所以编号连续的点在 trie \text{trie} trie 树上的位置是分散的&#x1f605; …...

算法通关村-----数组中元素出现次数问题

数组中出现次数超过一半的数字 问题描述 数组中有一个数字出现的次数超过数组长度的一半&#xff0c;请找出这个数字。你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。详见剑指offer39 问题分析 最直接的方式就是使用hashMap,遍历给定数组&#xff0c…...

Qt-键盘消息的传递-键盘消息的获取-C++

文章目录 1.概述2.焦点3.强制获取键盘消息4.键盘常用组合方法5.总结 1.概述 QKeyEvent 类用来描述一个键盘事件。当键盘按键被按下或者被释放时&#xff0c;键盘事件便会被发送给拥有键盘输人焦点的部件。 QKeyEvent 的 key() 函数可以获取具体的按键&#xff0c;对于 Qt 中给…...

数据结构与算法(五)--链表概念以及向链表添加元素

一、前言 今天我们学习另一种非常重要的线性数据结构–链表&#xff0c;之前我们已经学习了三种线性数据结构&#xff0c;分别是动态数组&#xff0c;栈和队列。其中队列我们额外学习了队列的另一种实现方式–循环队列。其实我们自己实现过前三个数据结构就知道&#xff0c;它…...

计算机视觉与深度学习-图像分割-视觉识别任务02-目标检测-【北邮鲁鹏】

目录标题 参考目标检测定义深度学习对目标检测的作用单目标检测多任务框架多任务损失预训练模型姿态估计 多目标检测问题滑动窗口&#xff08;Sliding Window&#xff09;滑动窗口缺点 AdaBoost&#xff08;Adaptive Boosting&#xff09;参考 区域建议 selective search 思想慢…...

Flink——Flink检查点(checkpoint)、保存点(savepoint)的区别与联系

Flink checkpoint Checkpoint是Flink实现容错机制最核心的功能&#xff0c;能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot&#xff0c;从而将这些状态数据定期持久化存储下来&#xff0c;从而将这些状态数据定期持久化存储下来&#xff0c;当Flink程序一…...

[篇五章五]-如何禁用 Windows Defender-我的创作纪念日

################################################## 目录 禁用掉烦人的 Windows Defender 在本地组策略编辑器中禁用 Windows Defende 关闭 Microsoft Defender 防病毒 禁止 Defender 开机自动运行 重新激活 Windows Defender #######################################…...

什么情况下使用微服务?

单体架构图参考网络&#xff1a; 1. 什么是单体应用 单体应用就是将应用程序的所有功能都打包成一个独立的单元&#xff0c;最终以一个WAR包或JAR包存在&#xff0c;没有外部的任何依赖&#xff0c;里面包含DAO、Service、UI等所有的逻辑。 优点&#xff1a; &#xff11;&…...

【Linux】Ubuntu美化主题【教程】

【Linux】Ubuntu美化主题【教程】 文章目录 【Linux】Ubuntu美化主题【教程】1. 安装优化工具Tweak2.下载自己喜欢的主题3. 下载自己喜欢的iconReference 1. 安装优化工具Tweak 首先安装优化工具Tweak sudo apt-get install gnome-tweak-tool安装完毕后在菜单中打开Tweak 然后…...

spring-boot2.x,使用EnableWebMvc注解导致的自定义HttpMessageConverters不可用

在json对象转换方面&#xff0c;springboot默认使用的是MappingJackson2HttpMessageConverter。常规需求&#xff0c;在工程中使用阿里的FastJson作为json对象的转换器。 FastJson SerializerFeatures WriteNullListAsEmpty &#xff1a;List字段如果为null,输出为[],而非nu…...

2023-09-20 Android CheckBox 让文字显示在选择框的左边

一、CheckBox 让文字在选择框的左边 &#xff0c;在布局文件里面添加下面一行就可以。 android:layoutDirection"rtl" 即可实现 android:paddingStart"10dp" 设置框文间的间距 二、使用的是left to right <attr name"layoutDirection">&…...

目标检测YOLO实战应用案例100讲-基于改进YOLOv5的口罩人脸检测

目录 前言 国内外研究现状 目标检测研究发展 国内外口罩人脸检测研究现状...

CentOS7 yum安装报错:“Could not resolve host: mirrorlist.centos.org; Unknown error“

虚拟机通过yum安装东西的时候弹出这个错误&#xff1a; 1、查看安装在本机的网卡 网卡ens33处于disconnected的状态 nmcli d2、输入命令&#xff1a; nmtui3、选择网卡&#xff0c;然后点击edit 4、移动到Automatically connect按空格键选择&#xff0c;然后移动到OK键按空格…...

关于token续签

通常我们会对token设置一个有效期&#xff0c;于是&#xff0c;就有了token续签的问题。由于token并没有续时机制&#xff0c;如果不能及时的替换掉过期的token&#xff0c;可能会拦截用户正常的请求&#xff0c;用户只能重新登录&#xff0c;如果提交的信息量很大&#xff0c;…...

淘宝分布式文件存储系统( 二 ) -TFS

淘宝分布式文件存储系统( 二 ) ->>TFS 目录 : 大文件存储结构哈希链表的结构文件映射原理及对应的API文件映射头文件的定义 大文件存储结构 : 采用块(block)文件的形式对数据进行存储 , 分成索引块,主块 , 扩展块 。所有的小文件都是存放到主块中的 &#xff0c;扩展块…...

Java中synchronized:特性、使用、锁机制与策略简析

目录 synchronized的特性互斥性可见性可重入性 synchronized的使用方法synchronized的锁机制常见锁策略乐观锁与悲观锁重量级锁与轻量级锁公平锁与非公平锁可重入锁与不可重入锁自旋锁读写锁 synchronized的特性 互斥性 synchronized确保同一时间只有一个线程可以进入同步块或…...

记一次clickhouse手动更改分片数异常

背景&#xff1a;clickhouse中之前是1分片1副本&#xff0c;随着数据量增多&#xff0c;想将分片数增多&#xff0c;于是驻场人员手动添加了分片数的节点信息 <clickhouse><!-- 集群配置 --><clickhouse_remote_servers><feihuang_ck_cluster><sha…...

深度学习论文: ISTDU-Net:Infrared Small-Target Detection U-Net及其PyTorch实现

深度学习论文: ISTDU-Net&#xff1a;Infrared Small-Target Detection U-Net及其PyTorch实现 ISTDU-Net&#xff1a;Infrared Small-Target Detection U-Net PDF: https://doi.org/10.1109/LGRS.2022.3141584 PyTorch代码: https://github.com/shanglianlm0525/CvPytorch PyTo…...

图像识别-YOLO V8安装部署-window-CPU-Pycharm

前言 安装过程中发现&#xff0c;YOLO V8一直在更新&#xff0c;现在是2023-9-20的版本&#xff0c;已经和1月份刚发布的不一样了。 eg: 目录已经变了&#xff0c;旧版预测:在ultralytics/yolo/v8/下detect 新版&#xff1a;ultralytics/models/yolo/detect/predict.py 1.安…...

js禁用F1至F12、禁止缩放、取消选中并且取消右键操作、打印、拖拽、鼠标点击弹出自定义信息、禁用开发者工具js

禁用js //禁止缩放 //luwenjie hualun window.addEventListener(mousewheel, function (event) {if (event.ctrlKey true || event.metaKey) {event.preventDefault();} }, {passive: false});//firefox window.addEventListener(DOMMouseScroll, function (event) {if (even…...

Zabbix5.0_介绍_组成架构_以及和prometheus的对比_大数据环境下的监控_网络_软件_设备监控_Zabbix工作笔记

z 这里Zabbix可以实现采集 存储 展示 报警 但是 zabbix自带的,展示 和报警 没那么好看,我们可以用 grafana进行展示,然后我们用一个叫睿象云的来做告警展示, 会更丰富一点. 可以看到 看一下zabbix的介绍. 对zabbix的介绍,这个zabbix比较适合对服务器进行监控 这个是zabbix的…...

百度SEO优化TDK介绍(分析下降原因并总结百度优化SEO策略)

TDK是SEO优化中很重要的部分&#xff0c;包括标题&#xff08;Title&#xff09;、描述&#xff08;Description&#xff09;和关键词&#xff08;Keyword&#xff09;&#xff0c;为百度提供网页内容信息。其中标题是最重要的&#xff0c;应尽量突出关键词&#xff0c;同时描述…...

网站建设的标准/百度 营销推广靠谱吗

1jsonp 一般接口使用jsonp跨域&#xff0c;使用jquery的ajax指定dataType为jsonp即可 $.ajax({async : true,url : "https://api.douban.com/v2/book/search",type : "GET",dataType : "jsonp", // 返回的数据类型&#xff0c;设置为JSONP方式js…...

东莞网站建设方案/seo 资料包怎么获得

(1)你一般怎么建索引的&#xff1f;去my.cnf里配置三个配置打开慢查询日志slow_query_log1慢查询日志存储路径slow_query_log_file/var/log/mysql/log-slow-queries.logSQL执行时间大于3秒&#xff0c;则记录日志long_query_time3首先进行SQL优化然后遵守简历索引规则索引并非越…...

西部数码网站管理助手/上海专业网络推广公司

1.执行whereis mysql会有如下打印&#xff1a;mysql: /usr/bin/mysql /usr/lib64/mysql /usr/include/mysql /usr/share/mysql /usr/share/man/man1/mysql.1.gz2.cd /usr/share/mysql目录下查看存在mysql.server文件3.复制mysql.server文件 到 /etc/init.d/下&#xff0c;命名为…...

b2c网站开发多少钱/二十条优化措施全文

部分内容参考&#xff1a;http://www.aspbc.com/tech/showtech.asp?id1256 在开发的过程中&#xff0c;经常使用window.onload和body onload两种&#xff0c;很少使用document.onreadystatechange&#xff0c;但这次写了一个js&#xff0c;使用window.onload和body.onload都实…...

代替手动修改网站模板标签/seo外链发布平台

一、转换图片格式为SVG 一般APK中都会用到很多图片&#xff0c;一般jpg或者png之类的图片占用的内存都很大&#xff0c;而且为了适配不同大小的屏幕&#xff0c;可能会准备多套内容一样&#xff0c;但是大小不一样的图片&#xff0c;这样就会造成大量内存被浪费。 SVG图片占用…...

广州网站建设推广公司/手机cpu性能增强软件

在实际的生产开发中&#xff0c;我们会接触到很多设计模式的例子&#xff0c;有些可能是你熟悉的&#xff0c;有些可能你不熟悉&#xff0c;或者见也没见过。本文的宗旨就是梳理开发中常见的设计模式&#xff0c;让你对设计模式有一定的认识&#xff0c;从而在生产实际中使用。…...