响应式编程详解,带你熟悉Reactor响应式编程
文章目录
- 一、什么是响应式编程
- 1、Java的流和响应式流
- 2、Java中响应式的使用
- 3、Reactor中响应式流的基本接口
- 4、Reactor中响应式接口的基本使用
- 二、初始Reactor
- 1、Flux和Mono的基本介绍
- 2、引入Reactor依赖
- 3、响应式类型的创建
- 4、响应式类型的组合
- (1)使用mergeWith合并响应式流
- (2)使用zip压缩合并响应式流
- (3)使用zip压缩合并为自定义对象的响应式流
- (4)选择第⼀个反应式类型进⾏发布
- 5、转换和过滤反应式流
- (1)skip操作跳过指定数⽬的消息
- (2)skip()操作的另⼀种形式
- (3)take操作只发布第⼀批指定数量的数据项
- (4)take操作的另一种形式
- (5)filter操作自定义过滤条件
- (6)distinct操作去重
- (7)map操作映射新元素
- (8)flatMap将流转成新的流
- (9)buffer操作现将数据流拆分为小块
- (10)collectList操作也可以将所有数据收集到一个List
- (11)collectMap 操作产生⼀个发布Map的Mono
- 6、在反应式类型上执行逻辑操作
- (1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件
- (2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件
- 7、在反应式类型上使用Subscriber订阅
- (1)使用Subscriber消费消息
- (2)使用Flux的doOnNext处理数据
- 8、使用then来处理完成数据返回
- 写在后面
一、什么是响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
在开发应⽤程序代码时,我们可以编写两种⻛格的代码,即命令式和响应式。
命令式(Imperative)的代码:它由⼀组任务组成,每次只运⾏⼀项任务,每项任务⼜都依赖于前⾯的任务。数据会按批次进⾏处理,在前⼀项任务还没有完成对当前数据批次的处理时,不能将这些数据递交给下⼀项处理任务。
响应式(Reactive)的代码:它定义了⼀组⽤来处理数据的任务,但是这些任务可以并⾏地执⾏。每项任务处理数据的⼀部分⼦集,并将结果交给处理流程中的下⼀项任务,同时继续处理数据的另⼀部分⼦集。
Reactor 是⼀个响应式编程库,同时也是Spring家族的⼀部分。它是Spring 5反应式编程功能的基础。
1、Java的流和响应式流
Java的Stream流通常都是同步的,并且只能处理有限的数据集。从本质上来说,它们只是使⽤函数来对集合进⾏迭代的⼀种⽅式。
响应式流⽀持异步处理任意⼤⼩的数据集,同样也包括⽆限数据集。只要数据就绪,它们就能实时地处理数据,并且能够通过回压来避免压垮数据的消费者。
2、Java中响应式的使用
JDK1.8时,是基于Observer/Observable接口而实现的观察者模式:
ObserverDemo observer = new ObserverDemo();
// 添加观察者
observer.addObserver(new Observer() {@Overridepublic void update(Observable o, Object arg) {System.out.println("发生了变化");}
});
observer.addObserver(new Observer() {@Overridepublic void update(Observable o, Object arg) {System.out.println("收到了通知");}
});
observer.setChanged(); // 数据变化
observer.notifyObservers(); // 通知
JDK9及以后,Observer/Observable接口就被弃用了,取而代之的是Flow类:
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;public class FlowDemo {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();// 2. 定义订阅者Subscriber<Integer> subscriber = new Subscriber<Integer>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 3. 发布者和订阅者 建立订阅关系publiser.subscribe(subscriber);// 4. 生产数据, 并发布// 这里忽略数据生产过程for (int i = 0; i < 1000; i++) {System.out.println("生成数据:" + i);// submit是个block方法publiser.submit(i);}// 5. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);// debug的时候, 下面这行需要有断点// 否则主线程结束无法debugSystem.out.println();}}
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;/**
* 带 process 的 flow demo
*//**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher<String>implements Processor<Integer, String> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("处理器接受到数据: " + item);// 过滤掉小于0的, 然后发布出去if (item > 0) {this.submit("转换后的数据:" + item);}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理器处理完了!");// 关闭发布者this.close();}
}public class FlowDemo2 {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisherSubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();// 2. 定义处理器, 对数据进行过滤, 并转换为String类型MyProcessor processor = new MyProcessor();// 3. 发布者 和 处理器 建立订阅关系publiser.subscribe(processor);// 4. 定义最终订阅者, 消费 String 类型数据Subscriber<String> subscriber = new Subscriber<String>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 5. 处理器 和 最终订阅者 建立订阅关系processor.subscribe(subscriber);// 6. 生产数据, 并发布// 这里忽略数据生产过程publiser.submit(-111);publiser.submit(111);// 7. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);}}
3、Reactor中响应式流的基本接口
响应式流规范可以总结为4个接⼝:Publisher、Subscriber、Subscription和Processor。
Publisher负责⽣成数据,并将数据发送给 Subscription(每个Subscriber对应⼀个Subscription)。
public interface Publisher<T> {// Publisher接⼝声明了⼀个⽅法 subscribe(),Subscriber可以通过该⽅法向 Publisher发起订阅。public void subscribe(Subscriber<? super T> s);
}
⼀旦Subscriber订阅成功,就可以接收来⾃Publisher的事件。
public interface Subscriber<T> {// Subscriber的第⼀个事件是通过对 onSubscribe()⽅法的调⽤接收的。public void onSubscribe(Subscription s);// 每个数据项都会通过该方法处理public void onNext(T t);// 异常处理public void onError(Throwable t);// 结束public void onComplete();
}
Publisher调⽤ onSubscribe() ⽅法时,会将Subscription对象传递给 Subscriber。
通过Subscription,Subscriber可以管理其订阅情况:
public interface Subscription {// Subscriber可以通过调⽤ request()⽅法来请求 Publisher 发送数据,可以传⼊⼀个long类型的数值以表明它愿意接受多少数据// 这也是回压能够发挥作⽤的地⽅,以避免Publisher 发送多于 Subscriber能够处理的数据量public void request(long n);// 调⽤ cancel()⽅法表明它不再对数据感兴趣并且取消订阅public void cancel();
}
Subscriber 请求数据之后,数据就会开始流经响应式流,调用onNext方法。
Processor接⼝,它是Subscriber和Publisher的组合:
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
4、Reactor中响应式接口的基本使用
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;public class ReactorDemo {public static void main(String[] args) {// reactor = jdk8 stream + jdk9 reactive stream// Mono 0-1个元素// Flux 0-N个元素String[] strs = { "1", "2", "3" };// 2. 定义订阅者Subscriber<Integer> subscriber = new Subscriber<Integer>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 这里就是jdk8的streamFlux.fromArray(strs).map(s -> Integer.parseInt(s))// 最终操作// 这里就是jdk9的reactive stream.subscribe(subscriber);}
}
二、初始Reactor
1、Flux和Mono的基本介绍
Reactor中有两个核心类,Mono和Flux。Flux和Mono是Reactor提供的最基础的构建块,⽽这两种响应式类型所提供的操作符则是组合使⽤它们以构建数据流动管线的黏合剂。
这两个类实现接口Publisher,提供丰富操作符。Flux对象实现发布者,返回N个元素Mono实现发布者,返回0或者1个元素。
Flux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种数据信号:元素值、错误信号、完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。
Flux和Mono共有500多个操作,这些操作都可以⼤致归类为:创建操作;组合操作;转换操作;逻辑操作。
注意!Mono和Flux的很多操作是相同的,只不过对应的数据数量不同,所以本文更多的操作都是基于Flux的,Mono也同理。
2、引入Reactor依赖
需要引入reactor-core核心包和测试包。
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.x.x</version>
</dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><version>3.x.x</version><scope>test</scope>
</dependency>
3、响应式类型的创建
Reactor提供了多种创建Flux和Mono的操作。
// 使⽤Flux或Mono上的静态 just()⽅法来创建⼀个响应式类型
Mono.just(1);
Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 调用just或其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。
// 添加一个订阅者,subscribe的方法参数相当于是一个Consumer
fruitFlux.subscribe(f -> System.out.println("Here's some fruit: " + f)
);// 根据集合创建
String[] fruits = new String[] {"Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux<String> fruitFlux2 = Flux.fromArray(fruits);
List<String> list = Arrays.asList(fruits);
Flux.fromIterable(list); // 集合Stream<String> stream = list.stream();
Flux.fromStream(stream); // stream流// 根据区间创建1-5
Flux<Integer> intervalFlux =Flux.range(1, 5);
intervalFlux.subscribe(f -> System.out.println("data is :" + f)
);
// 每秒发布⼀个值的Flux,通过interval()⽅法创建的Flux会从0开始发布值,并且后续的条⽬依次递增。
// 因为interval()⽅法没有指定最⼤值,所以它可能会永远运⾏。我们也可以使⽤take()⽅法将结果限制为前5个条⽬。
Flux<Long> intervalFlux2 =Flux.interval(Duration.ofSeconds(1)).take(5);
intervalFlux2.subscribe(f -> System.out.println("data2 is :" + f)
);// 阻塞,等待结果
Thread.sleep(100000);
4、响应式类型的组合
(1)使用mergeWith合并响应式流
Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa").delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples").delaySubscription(Duration.ofMillis(250)) // 订阅后250毫秒后开始发布数据.delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据// 使⽤mergeWith()⽅法,将两个Flux合并,合并过后的Flux数据项发布顺序与源Flux的发布时间⼀致
// Garfield Lasagna Kojak Lollipops Barbossa Apples
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);mergedFlux.subscribe(System.out::println);// 阻塞,等待结果
Thread.sleep(100000);
我们发现,使用mergeWith合并过的两个FLux,并没有严格意义上的先后之分,谁产生了数据就接着消费,与同一个无异。
(2)使用zip压缩合并响应式流
Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");
// 当两个Flux对象压缩在⼀起的时候,它将会产⽣⼀个新的发布元组的Flux,其中每个元组中都包含了来⾃每个源Flux的数据项
// 这个合并后的Flux发出的每个条⽬都是⼀个Tuple2(⼀个容纳两个其他对象的容器对象)的实例,其中包含了来⾃每个源Flux的数据项,并保持着它们发布的顺序。
Flux<Tuple2<String, String>> zippedFlux =Flux.zip(characterFlux, foodFlux);zippedFlux.subscribe(t -> {System.out.println(t.getT1() + "|" + t.getT2());
});
/*** 执行结果:* Garfield|Lasagna* Kojak|Lollipops* Barbossa|Apples*/
(3)使用zip压缩合并为自定义对象的响应式流
如果你不想使⽤Tuple2,⽽想要使⽤其他类型,就可以为zip()⽅法提供⼀个合并函数来⽣成你想要的任何对象,合并函数会传⼊这两个数据项。
zip操作的另⼀种形式(从每个传⼊Flux中各取⼀个元素,然后创建消息对象,并产⽣这些消息组成的Flux)
Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");// 压缩成自定义对象
Flux<String> zippedFlux =Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
zippedFlux.subscribe(System.out:: println);/*** 执行结果:* Garfield eats Lasagna* Kojak eats Lollipops* Barbossa eats Apples*/
(4)选择第⼀个反应式类型进⾏发布
假设我们有两个Flux对象,此时我们不想将它们合并在⼀起,⽽是想要创建⼀个新的Flux,让这个新的Flux从第⼀个产⽣值的Flux中发布值。first()操作会在两个Flux对象中选择第⼀个发布值的Flux,并再次发布它的值。
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth").delaySubscription(Duration.ofMillis(100)); // 延迟100ms
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
// 选择第⼀个反应式类型进⾏发布
Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
firstFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/*** 执行结果:* hare* cheetah* squirrel*/
5、转换和过滤反应式流
在数据流经⼀个流时,我们通常需要过滤掉某些值并对其他的值进⾏处理。
(1)skip操作跳过指定数⽬的消息
skip操作跳过指定数⽬的消息并将剩下的消息继续在结果Flux上进⾏传递
// 跳过3个,并创建一个新的Flux
Flux<String> skipFlux = Flux.just("one", "two", "skip a few", "ninety nine", "one hundred").skip(3);
skipFlux.subscribe(System.out::println);
/*** 执行结果* ninety nine* one hundred*/
(2)skip()操作的另⼀种形式
在⼀段时间之内跳过所有的第⼀批数据。
// 这是skip()操作的另⼀种形式,将会产⽣⼀个新Flux,在发布来⾃源Flux的数据项之前等待指定的⼀段时间
Flux<String> skipFlux = Flux.just("one", "two", "skip a few", "ninety nine", "one hundred").delayElements(Duration.ofSeconds(1)) // 每1秒一个.skip(Duration.ofSeconds(4)); // 4秒前的都跳过
skipFlux.subscribe(System.out::println);// 阻塞,等待结果
Thread.sleep(100000);/*** 执行结果:* ninety nine* one hundred*/
(3)take操作只发布第⼀批指定数量的数据项
根据对skip操作的描述来看,take可以认为是与skip相反的操作。skip操作会跳过前⾯⼏个数据项,⽽take操作只发布第⼀批指定数量的数据项,然后将取消订阅。
// take操作只发布传⼊Flux中前⾯指定数⽬的数据项,然后将取消订阅
Flux<String> nationalParkFlux = Flux.just("Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton").take(3);
nationalParkFlux.subscribe(System.out::println);
/*** 执行结果:* Yellowstone* Yosemite* Grand Canyon*/
(4)take操作的另一种形式
take()⽅法也有另⼀种替代形式,基于间隔时间⽽不是数据项个数(在指定的时间过期之前,⼀直将消息传递给结果Flux)。它将接受并发布与源Flux⼀样多的数据项,直到某段时间结束,之后Flux将会完成。
// 在订阅之后的前3.5秒发布数据条⽬。
Flux<String> nationalParkFlux = Flux.just("Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton").delayElements(Duration.ofSeconds(1)).take(Duration.ofMillis(3500));
nationalParkFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/*** 执行结果:* Yellowstone* Yosemite* Grand Canyon*/
(5)filter操作自定义过滤条件
filter操作允许我们根据任何条件进⾏选择性地发布。
Flux<String> nationalParkFlux = Flux.just("Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton").filter(np -> !np.contains(" ")); // 过滤携带空格的
nationalParkFlux.subscribe(System.out::println);
/*** 执行结果* Yellowstone* Yosemite* Zion*/
(6)distinct操作去重
Flux<String> animalFlux = Flux.just("dog", "cat", "bird", "dog", "bird", "anteater").distinct();
// 去重
animalFlux.subscribe(System.out::println);
/*** 执行结果:* dog* cat* bird* anteater*/
(7)map操作映射新元素
map将元素映射为新的元素,并创建一个新的Flux。
// map将元素映射为新的元素,并创建一个新的Flux
Flux<Integer> integerFlux = Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr").map(n -> {String[] split = n.split("\\s");return split.length; // 将String转为Integer});
integerFlux.subscribe(System.out::println);/*** 执行结果:* 2* 2* 2*/
其中重要的⼀点是:在每个数据项被源Flux发布时,map操作是同步执⾏的,如果你想要异步地转换过程,那么你应该考虑使⽤flatMap操作。
(8)flatMap将流转成新的流
flatMap并不像map操作那样简单地将⼀个对象转换到另⼀个对象,⽽是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。当与subscribeOn()⽅法结合使⽤时,flatMap操作可以释放Reactor反应式的异步能⼒。
// 使⽤flatMap()⽅法和subscribeOn()⽅法
Flux<Integer> integerFlux = Flux.just("Michael", "Scottie Pippen", "Steve Kerr Ob").flatMap(n -> Mono.just(n).map(p -> {String[] split = p.split("\\s");return split.length; // 将String转为Integer}).subscribeOn(Schedulers.parallel()) // 定义异步);
integerFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
(9)buffer操作现将数据流拆分为小块
buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
// buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
Flux<String> fruitFlux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");
// 创建⼀个新的包含List 集合的Flux,其中每个List只有不超过指定数量的元素
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3); // 数据切分为小块,每3个一块
bufferedFlux.subscribe(System.out::println);
/*** 执行结果:* [apple, orange, banana]* [kiwi, strawberry]*/
// 可以分片后并行执行
bufferedFlux.flatMap(x ->Flux.fromIterable(x).map(y -> y.toUpperCase()).subscribeOn(Schedulers.parallel())
).subscribe(l -> {System.out.println(Thread.currentThread().getName() + "线程执行:" + l);
});
/*** 执行结果(因为并行执行,结果可能不一致):* parallel-1线程执行:APPLE* parallel-1线程执行:ORANGE* parallel-1线程执行:BANANA* parallel-2线程执行:KIWI* parallel-2线程执行:STRAWBERRY*/
// 阻塞,等待结果
Thread.sleep(100000);
使⽤不带参数的buffer()⽅法可以将Flux发布的所有数据项都收集到⼀个List中:
Flux<List<String>> bufferedFlux = fruitFlux.buffer();
(10)collectList操作也可以将所有数据收集到一个List
collectList操作将产⽣⼀个包含传⼊Flux发布的所有消息的Mono。
Flux<String> fruitFlux = Flux.just("apple", "orange", "banana", "kiwi", "strawberry");
// 生成一个Mono,里面包含一个List
Mono<List<String>> fruitListMono = fruitFlux.collectList();
(11)collectMap 操作产生⼀个发布Map的Mono
collectMap操作将会产⽣⼀个Mono(包含了由传⼊Flux所发出的消息产⽣的Map,这个Map的key是从传⼊消息的某些特征衍⽣⽽来的)
Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =animalFlux.collectMap(a -> a.charAt(0)); // 将第一个字符作为Map的key
animalMapMono.subscribe(System.out::println);
/*** 执行结果:* {a=aardvark, e=eagle, k=kangaroo}*/// 阻塞,等待结果
Thread.sleep(100000);
key相同的,会被覆盖。
6、在反应式类型上执行逻辑操作
(1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件
Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
都满足条件会返回true,否则返回false。
(2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件
Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));
至少有一个满足条件,就为true,都不满足就为false。
7、在反应式类型上使用Subscriber订阅
(1)使用Subscriber消费消息
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");stringFlux.subscribe(new Subscriber<String>() {// 保存订阅关系, 需要用它来给发布者响应private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {System.out.println("订阅者开始订阅");this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("订阅者开始处理数据" + item);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable t) {// 出现了异常(例如处理数据的时候产生了异常)t.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("订阅者处理完了!");}
});
/*** 执行结果:* 订阅者开始订阅* 订阅者开始处理数据Apple* 订阅者开始处理数据Orange* 订阅者开始处理数据Grape* 订阅者开始处理数据Banana* 订阅者开始处理数据Strawberry* 订阅者处理完了!*/// 阻塞
Thread.sleep(10000);
(2)使用Flux的doOnNext处理数据
Flux的doOnNext,会添加当Flux发出一个项目时触发的行为(副作用)。
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t)).subscribe(t -> System.out.println("订阅者处理数据:" + t));
/*** 执行结果:* 发布者处理数据:Apple* 订阅者处理数据:Apple* 发布者处理数据:Orange* 订阅者处理数据:Orange* 发布者处理数据:Grape* 订阅者处理数据:Grape* 发布者处理数据:Banana* 订阅者处理数据:Banana* 发布者处理数据:Strawberry* 订阅者处理数据:Strawberry*/// 阻塞
Thread.sleep(10000);
但是!以下写法是不会触发发布者的doOnNext事件的:
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t));
stringFlux.subscribe(t -> System.out.println("订阅者处理数据:" + t));
只有链式调用,才会触发发布者的doOnNext事件。
doOnNext可以写多个,顺序执行:
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者1处理数据:" + t)).doOnNext(t -> System.out.println("发布者2处理数据:" + t)).subscribe(t -> System.out.println("订阅者处理数据:" + t));
/*** 执行结果:* 发布者1处理数据:Apple* 发布者2处理数据:Apple* 订阅者处理数据:Apple* 发布者1处理数据:Orange* 发布者2处理数据:Orange* 订阅者处理数据:Orange* 发布者1处理数据:Grape* 发布者2处理数据:Grape* 订阅者处理数据:Grape* 发布者1处理数据:Banana* 发布者2处理数据:Banana* 订阅者处理数据:Banana* 发布者1处理数据:Strawberry* 发布者2处理数据:Strawberry* 订阅者处理数据:Strawberry*/
8、使用then来处理完成数据返回
Flux<String> just = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 返回一个Mono ,在此Flux完成时完成。这将主动忽略序列,只重放完成或错误信号。
just.doOnNext(t -> System.out.println("发布者处理数据:" + t)).then(Mono.defer(() -> {return Mono.just("我完成了");})).subscribe(t -> System.out.println("订阅者处理数据:" + t));
/*** 执行结果:* 发布者处理数据:Apple* 发布者处理数据:Orange* 发布者处理数据:Grape* 发布者处理数据:Banana* 发布者处理数据:Strawberry* 订阅者处理数据:我完成了*/
通常来说,发布者发布完之后,都需要调用then来处理数据,或调用thenEmpty返回一个空的Mono(Mono.empty())。
写在后面
如果本文对你有帮助,请点赞收藏关注一下吧 ~
相关文章:
响应式编程详解,带你熟悉Reactor响应式编程
文章目录一、什么是响应式编程1、Java的流和响应式流2、Java中响应式的使用3、Reactor中响应式流的基本接口4、Reactor中响应式接口的基本使用二、初始Reactor1、Flux和Mono的基本介绍2、引入Reactor依赖3、响应式类型的创建4、响应式类型的组合(1)使用m…...
踩坑篇之WebSocket实现类中无法使用@Autowired注入对象
大家好,我是小简,今天我又大意了,在WebSocket这个类上踩坑了。 接下来我讲讲我踩坑的经历吧! package cn.donglifeng.shop.socket.endpoin;import cn.donglifeng.shop.common.context.SpringBeanContext; import cn.donglifeng.s…...
QT CTK插件框架 (一 下载编译)
CTK 为支持生物医学图像计算的公共开发包,其全称为 Common Toolkit。为医学成像提供一组统一的基本功能;促进代码和数据的交互及结合;避免重复开发;在工具包(医学成像)范围内不断扩展到新任务,而…...
【Java版oj】day10 井字棋、密码强度等级
目录 一、井字棋 (1)原题再现 (2)问题分析 (3)完整代码 二、密码强度等级 (1)原题再现 (2)问题分析 (3)完整代码 一、井字棋 &a…...
JavaScript的事件传播机制
你在学习和编写JavaScript时可能听说过事件冒泡(event bubbling)。它会发生在多个元素存在嵌套关系,并且这些元素都注册了同一事件(例如click)的监听器时。 但是事件冒泡只是事件机制的一部分。它经常与事件捕获(event capturing)和事件传播…...
队列的定义及基本操作实现(链式)
个人主页:【😊个人主页】 系列专栏:【❤️数据结构与算法】 学习名言:天子重英豪,文章教儿曹。万般皆下品,惟有读书高 系列文章目录 第一章 ❤️ 学前知识 第二章 ❤️ 单向链表 第三章 ❤️ 递归 文章目录…...
集成方法!
目录 关注降低variance,选择bias较小的基学习器 Bagging Stacking Random Forest 关注降低bias,选择variance较小的基学习器 Adaboost Boosting 关注降低variance,选择bias较小的基学习器 Bagging 给定m个样本的数据集,利用有放回的随机采样法,得…...
20年程序员生涯,读了200多本技术书,挑了几本精华好书分享给大家
不知不觉已经又走过了20个年头了,今年已经44了,虽然我已经退休在家,但一直都保持着读书的习惯,我每年平均要读10本技术书籍,保持不让自己的技术落伍。 这些年读的技术书不下200本,很多好书我都会保存在家&a…...
C++ 手写一个WebServer
文章目录 前言一、WebServer的原理刨析二、HTTP协议基础三、C++代码实战四、运行测试前言 本文由:我不会画饼呀 提供建议 大家如果有什么想看的文章(想了解的知识点),都可以在本专栏文章底部评论,或者私信我,在有能力的前提下,我都会尽量给大家写出来,供大家学习参考 …...
Elasticsearch 简介与安装
简介 Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 Apache Lucene™ 基础之上。 Lucene 可以说是当下最先进、高性能、全功能的搜索引擎库—无论是开源还是私有。 但是 Lucene 仅仅只是一个库。为了充分发挥其功能,你需要使用 Java…...
Qt5.12实战之QByteArray与字符指针及字符串转换
示例源码:#include <QCoreApplication> #include <QDebug> #include <QTextStream> static QTextStream cout (stdout,QIODevice::WriteOnly); #include <iostream> #include <QtGlobal> #include <QByteArray>void test() {qDebug() <…...
二、ElasticSearch基础语法
目录一、简单了解ik分词器(分词效果)1.standard(单字分词器,es默认分词器)2.ik_smart分词(粗粒度的拆分)3.ik_max_word分词器(最细粒度拆分)二、指定默认分词器1.为索引指定默认分词器三、ES操作数据1.概述2.创建索引3.查询索引4.删除索引5.添…...
Yolov8详解与实战
文章目录摘要模型详解C2F模块Losshead部分模型实战训练COCO数据集下载数据集COCO转yolo格式数据集(适用V4,V5,V6,V7,V8)配置yolov8环境训练测试训练自定义数据集Labelme数据集摘要 YOLOv8 是 ultralytics …...
多线程案例——阻塞队列
目录 一、阻塞队列 1. 生产者消费者模型 (1)解耦合 (2)“削峰填谷” 2. 标准库中的阻塞队列 3. 自己实现一个阻塞队列(代码) 4. 自己实现生产者消费者模型(代码) 一、阻塞队列…...
学习优秀博文(【国产MCU移植】手把手教你使用RT-Thread制作GD32系列BSP)有感 | 文末赠书5本
学习优秀博文(【guo产MCU移植】手把手教你使用RT-Thread制作GD32系列BSP)有感 一篇优秀的博文是什么样的?它有什么规律可循吗?优秀的guo产32位单片机处理器是否真的能成功替换掉stm32的垄断地位? 本文博主以亲身经历聊…...
写用例写的焦头烂额?看看摸鱼5年的老点工是怎么写的...
给你个需求,你要怎么转变成最终的用例? 直接把需求文档翻译一下就完事了。 老点工拿到需求后的标准操作: 第一步:解析需求 先解析需求-找出所有需求中的动词,再列出所有测试点。测试点过程不断发散,对于…...
基于深度学习的鸟类检测识别系统(含UI界面,Python代码)
摘要:鸟类识别是深度学习和机器视觉领域的一个热门应用,本文详细介绍基于YOLOv5的鸟类检测识别系统,在介绍算法原理的同时,给出Python的实现代码以及PyQt的UI界面。在界面中可以选择各种鸟类图片、视频以及开启摄像头进行检测识别…...
零基础搭建Tomcat集群(超详细)
💗推荐阅读文章💗 🌸JavaSE系列🌸👉1️⃣《JavaSE系列教程》🌺MySQL系列🌺👉2️⃣《MySQL系列教程》🍀JavaWeb系列🍀👉3️⃣《JavaWeb系列教程》…...
机器学习自学笔记——聚类
聚类的基本概念 聚类,顾名思义,就是将一个数据集中各个样本点聚集成不同的“类”。每个类中的样本点都有某些相似的特征。比如图书馆中,会把成百上千的书分成不同的类别:科普书、漫画书、科幻书等等,方便人们查找。每…...
注意下C语言整形提升
C语言整形提升 C语言整形提升是指在表达式中使用多种类型的数据时,编译器会自动将较小的类型转换为较大的类型,以便进行运算。在C语言中,整型提升规则如下: 如果表达式中存在short类型,则将其自动转换为int类型。 如…...
Go panic的学习
一、前言 我们的应用程序常常会出现异常,包括由运行时检测到的异常或者应用开发者自己抛出的异常。 异常在一些其他语言中,如c、java,被叫做Exception,主要由抛出异常和捕获异常两部分组成。异常在go语言中,叫做pani…...
讲解Linux中samba理论讲解及Linux共享访问
♥️作者:小刘在C站 ♥️个人主页:小刘主页 ♥️每天分享云计算网络运维课堂笔记,努力不一定有收获,但一定会有收获加油!一起努力,共赴美好人生! ♥️夕阳下,是最美的绽放࿰…...
【C++笔试强训】第三十二天
🎇C笔试强训 博客主页:一起去看日落吗分享博主的C刷题日常,大家一起学习博主的能力有限,出现错误希望大家不吝赐教分享给大家一句我很喜欢的话:夜色难免微凉,前方必有曙光 🌞。 💦&a…...
OpenAI GPT-4震撼发布:多模态大模型
OpenAI GPT-4震撼发布:多模态大模型发布要点GPT4的新功能GPT-4:我能玩梗图GPT4:理解图片GPT4:识别与解析图片内容怎样面对GPT4申请 GPT-4 API前言: 🏠个人主页:以山河作礼。 📝📝:本文章是帮助大家更加了…...
手把手教你 在linux上安装kafka
目录 1. 准备服务器 2. 选一台服务器配置kafka安装包 2.1 下载安装包 2.2 解压安装包 2.3 修改配置文件 3. 分发安装包到其他机器 4. 修改每台机器的broker.id 5. 配置环境变量 6. 启停kafka服务 6.1 启动kafak服务 6.2 停止kafka服务 1. 准备服务器 1.买几台云服务…...
Spring Cloud(微服务)学习篇(五)
Spring Cloud(微服务)学习篇(五) 1 nacos配置文件的读取 1.1 访问localhost:8848/index.html并输入账户密码后进入nacos界面并点击配置列表 1.2 点击右侧的号 1.3 点击加号后,进入新建配置界面,并做好如下配置 1.4 往下翻动,点击发布按钮 1.5 发布成功后的界面 1.6 在pom.xml…...
道阻且长,未来可期,从GPT-4窥得通用人工智能时代的冰山一角!
大家这两天是不是又被满屏的ChatGPT相关的文章信息给轰炸得不轻,说实话,我真的对ChatGPT的热度如此经久不衰这个问题非常感兴趣。从去年刚面世时,小范围内造成的行业震荡,到今年二月份铺天盖地得铺舆论造势,引发全民热…...
百度将?百度已!
仿佛一夜之间,创业公司OpenAI旗下的ChatGPT就火遍全球。这是一场十分罕见的科技盛宴。下到普通用户,上到各科技大厂都在讨论ChatGPT的前景,国外的微软、谷歌,国内的百度、腾讯、阿里等等都在布局相关业务。比尔盖茨更是称ChatGPT与…...
内核实验(三):编写简单Linux内核模块,使用Qemu加载ko做测试
文章目录一、篇头二、QEMU:挂载虚拟分区2.1 创建 sd.ext4.img 虚拟分区2.2 启动 Qemu2.3 手动挂载 sd.ext4.img三、实现一个简单的KO3.1 目录文件3.2 Makefile3.3 编译3.3.1 编译打印3.3.2 生成文件3.4 检查:objdump3.4.1 objdump -dS test\_1.ko3.4.2 o…...
女子举重问题
一、问题的描述 问题及要求 1、搜集各个级别世界女子举重比赛的实际数据。分别建立女子举重比赛总成绩的线性模型、幂函数模型、幂函数改进模型,并最终建立总冠军评选模型。 应用以上模型对最近举行的一届奥运会女子举重比赛总成绩进行排名,并对模型及…...
jsp个人网站设计/百度搜索排名机制
腾讯面试复盘,总结了一下,面试总共是问了七个方面的问题(仅仅是个人面试经历,后台开发岗),包含:数据库、数据结构、JVM、网络、JAVA、分布式、操作系统等七个模块,下面就给大家介绍一…...
怎么创建网站 优帮云/种子搜索引擎
故障现象一、推拉式机库门电源指示灯不亮原因:1指示灯坏;2外部电源未输入;3开关电源损坏处理方法:1更换指示灯;2接通电源(测量正常)故障现象二、推拉式机库门有电源不能运转原因:1电源相序错误;…...
济南建设主管部门网站/宁波网站快速优化
近日,IBM宣布筹建中的认知计算系统研究中心(C3SR)将安家于美国伊利诺伊大学厄巴纳分校的工程学院。 C3SR计划在今年夏季正式启动,将构建以IBM Watson技术为基础的综合认知计算系统,对包括视频、讲稿、作业和教材等大量…...
做商城网站要什么手续/赣州seo外包
导语:算法是指解题方案的准确而完整的描述,是一系列解决问题的清晰指令,算法代表着用系统的方法描述解决问题的策略机制。下面是YJBYS小编收集整理的有关计算机算法的英语词汇,欢迎参考!Median and Selection 中位数Generating Pe…...
阿里万网怎么做网站/优化大师有必要安装吗
在GameViewController.swift中重载prefersStatusBarHidden方法,返回true override func prefersStatusBarHidden() -> Bool {return true } 转载于:https://www.cnblogs.com/sandal1980/p/3819074.html...
病毒疫情/信阳网站seo
<script type"text/javascript">//js的继承实现function Person() {// 定义基类的属性和方法this.name;this.age;this.say function () {alert(my name is: this.name);}this.showAge function () {alert(my age is: this.age);}}function Stu() {// 定义派…...