当前位置: 首页 > news >正文

响应式操作实战案例

Project Reactor 框架

在Spring Boot 项目 Maven 中添加依赖管理。

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId>
</dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope>
</dependency>

如果想在Spring Boot 项目中使用 Reactor,那么你需要在构建中设置 Reactor 的 BOM(物料清单)。下面的依赖管理条目增加了 Reactor 的 Bismuth-RELEASE 到构建中:

<dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>Bismuth-RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

Reactor 异步数据序列

       响应式流规范的基本组件是一个异步的数据序列,在 Reactor 框架中,我们可以把这个异步数据序列表示成如下形式:

上面的异步数据序列可以用下面的公式来表示:

onNext x 0..N [onError | onComplete]
  • onNext:表示正常的包含元素的消息通知

  • onComplete:表示序列结束的消息通知

  • onError:表示序列出错的消息通知

        当触发这些消息通知时,异步序列的订阅者中对应的这三个同名方法将被调用。正常情况下,onNext() 和 onComplete() 方法都应该被调用,用来正常消费数据并结束序列。如果没有调用 onComplete() 方法就会生成一个无界数据序列,在业务系统中,这通常是不合理的。而 onError() 方法只有序列出现异常时才会被调用。 

       基于上述异步数据序列,Reactor 框架提供了两个核心组件来发布数据,分别是 Flux 和 Mono 组件。这两个组件可以说是应用程序开发过程中最基本的编程对象,这两个组件非常重要,理解清楚它两,Reactor 响应式编程才算进入门槛。

 Flux 和 Mono 组件

Flux 代表的是一个包含 0 到 n 个元素的异步序列,如下:

  • Flux 是一个标准 Publisher,表示0到N个发射项的异步序列,可选地以完成信号或错误终止。与 Reactive Streams 规范中一样,这三种类型的信号转换为对下游订阅者的 onNext、onComplete 或 onError 方法的调用。

  • 在这种大范围的可能信号中,Flux 是通用的 reactive 类型。注意,所有事件,甚至终止事件,都是可选的:没有 onNext 事件,但是 onComplete 事件表示一个空的有限序列,但是移除 onComplete 并且你有一个无限的空序列(除了关于取消的测试之外,没有特别有用)。同样,无限序列不一定是空的。例如,Flux.interval(Duration) 产生一个 Flux,它是无限的,从时钟发出规则的数据。

Mono 代表的是一个包含 0 到 1 个元素的异步序列,如下:

  • Mono 是一个专门的 Publisher,它最多发出一个项,然后可选地以 onComplete 信号或 onError 信号结束。

  • 它只提供了可用于 Flux 的操作符的子集,并且一些操作符(特别是那些将 Mono 与另一个发布者组合的操作符)切换到 Flux。

  • 例如,Mono#concatWith(Publisher) 返回一个 Flux ,而 Mono#then(Mono) 则返回另一个 Mono。

  • 注意,Mono 可以用于表示只有完成概念(类似于Runnable)的无值异步进程。若要创建一个,请使用 Mono

响应式操作实战 

通过 Flux 对象创建响应式流

主要有以下两大类:

  • 基于各种工厂模式的静态创建方法;

  • 采用编程的方式动态创建 Flux;

 基于各种工厂模式的静态创建方法 

Reactor 中静态创建 Flux 的方法常见的包括 just()、range()、interval() 以及各种以 from- 为前缀的方法组等。

  • just() 方法

它可以指定序列中包含的全部元素,创建出来的 Flux 序列在发布这些元素之后会自动结束。一般情况下,在已知元素数量和内容时,使用 just() 方法是创建 Flux 的最简单直接的做法。 

Flux.just("Apple", "Orange", "Grape", "Banana","Strawberry").subscribe(System.out::println);

 控制台输出:

Apple
Orange
Grape
Banana
Strawberry

 这里要想控制台有输出,必须要调用 subscribe 方法,Flux 要是没有订阅者,数据就不会流动。以花园软管的思路进行类比,你已经把软管接到出水口了,另一端就是从自来水公司流出的水。但是水不会流动,除非你打开水龙头。对响应式类型的订阅就是打开数据流的方式。

subscribe() 中的 lambda 表达式实际上是 java.util.Consumer,用于创建响应式流的 Subscriber。由于调用了 subscribe() 方法,数据开始流动了。在这个例子中,不存在中间操作,因此数据直接从 Flux 流到了 Subscriber。

  • fromXXX() 方法组

如果我们已经有了一个数组、一个 Iterable 对象或 Stream 对象,那么就可以通过 Flux 提供的 fromXXX() 方法组来从这些对象中自动创建 Flux,包括 fromArray()、fromIterable() 和 fromStream() 方法。

 

// fromArray()
String[] fruits = new String[] {"Apple", "Orange", "Grape", "Banana", "Strawberry"};
Flux.fromArray(fruits).subscribe(System.out::println);// fromIterable()
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux.fromIterable(fruitList).subscribe(System.out::println);// fromStream() 
Stream<String> fruitStream =Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux.fromStream(fruitStream).subscribe(System.out::println);

 三个方法控制台都是输出:

Apple
Orange
Grape
Banana
Strawberry
  • range() 方法

有时你没有任何数据可供使用,只需要使用 Flux 作为计数器,发出一个随每个新值递增的数字。要创建计数器 Flux,可以使用静态 range() 方法。

Flux.range(1, 5).subscribe(System.out::println);
  • interval() 方法

在 Reactor 框架中,interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列。通过 interval() 所具备的一组重载方法,我们可以分别指定这个数据序列中第一个元素发布之前的延迟时间,以及每个元素之间的时间间隔。

图中每个元素发布时相当于添加了一个定时器的效果。使用 interval() 方法的示例代码如下所示:

Flux.interval(Duration.ofSeconds(2), Duration.ofMillis(200)).subscribe(System.out::println);

 这段代码的执行效果相当于在等待 2 秒钟之后,生成一个从 0 开始逐一递增的无界数据序列,每 200 毫秒推送一次数据。

  • empty()、error() 和 never()

我们可以分别使用 empty()、error() 和 never() 这三个方法类创建一些特殊的数据序列。其中,如果你希望创建一个只包含结束消息的空序列,那么可以使用 empty() 方法,使用示例如下所示。显然,这时候控制台应该没有任何的输出结果。

Flux.empty().subscribe(System.out::println);

然后,通过 error() 方法可以创建一个只包含错误消息的序列。如果你不希望所创建的序列不发出任何类似的消息通知,也可以使用 never() 方法实现这一目标。当然,这几个方法都比较少用,通常只用于调试和测试。

不难看出,静态创建 Flux 的方法简单直接,一般用于生成那些事先已经定义好的数据序列。而如果数据序列事先无法确定,或者生成过程中包含复杂的业务逻辑,那么就需要用到动态创建方法。

 采用编程的方式动态创建 Flux

动态创建 Flux 所采用的就是以编程的方式创建数据序列,最常用的就是 generate() 方法和 create() 方法。

  • generate() 方法

generate() 方法生成 Flux 序列依赖于 Reactor 所提供的 SynchronousSink 组件,定义如下:

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

 SynchronousSink 组件包括 next()、complete() 和 error() 这三个核心方法。从 SynchronousSink 组件的命名上就能知道它是一个同步的 Sink 组件,也就是说元素的生成过程是同步执行的。这里要注意的是 next() 方法只能最多被调用一次。使用 generate() 方法创建 Flux 的示例代码如下:

Flux.generate(sink -> {sink.next("splendor.s");sink.complete();
}).subscribe(System.out::println);

运行代码控制台会打印“splendor.s”,我们在这里调用了一次 next() 方法,并通过 complete() 方法结束了这个数据流,如果不调用 complete() 方法,那么就会生成一个所有元素均为“splendor.s”的无界数据流。

如果想要在序列生成过程中引入状态,那么可以使用如下所示的 generate() 方法重载。

Flux.generate(() -> 1, (i, sink) -> {sink.next(i);if (i == 5) {sink.complete();}return ++i;
}).subscribe(System.out::println);

这里引入了一个代表中间状态的变量 i,然后根据 i 的值来判断是否终止序列。显然,以上代码的执行效果会在控制台中输入 1 到 5 这 5 个数字。 

  • create()

我们再来看下 create() 方法,定义如下:

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)

FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还定义了背压策略,并且可以在一次调用中产生多个元素。使用 create() 方法创建 Flux 的示例代码如下:

Flux.create(sink -> {for (int i = 0; i < 5; i++) {sink.next("splendor.s" + i);}sink.complete();
}).subscribe(System.out::println);

运行代码控制台会打印“微splendor.s 0”到“splendor.s 4”的5个数据,通过 create() 方法创建 Flux 对象的方式非常灵活。 

通过 Mono 对象创建响应式流

对于 Mono 而言,可以认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。

针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 justOrEmpty() 等方法。

justOrEmpty() 方法会先判断所传入的对象中是否包含值,只有在传入对象不为空时,Mono 序列才生成对应的元素,该方法示例代码如下:

Mono.justOrEmpty(Optional.of("splendor.s")).subscribe(System.out::println);

如果要想动态创建 Mono,我们同样也可以通过 create() 方法并使用 MonoSink 组件,示例代码如下:

Mono.create(sink -> sink.success("splendor.s")).subscribe(System.out::println);

 操作符分类

Reactor 库中的 API 提供了一组丰富的操作符,这些操作符为响应式流规范提供了很大的一个便利性。但 Reactor 中所提供的操作符数量众多,这里只针对几类具有代表性的操作符来讨论。

我将 Flux 和 Mono 操作符分成如下六大类型:

  • 转换(Transforming)操作符,负责将序列中的元素转变成另一种元素;

  • 过滤(Filtering)操作符,负责将不需要的数据从序列中剔除出去;

  • 组合(Combining)操作符,负责将序列中的元素进行合并、连接和集成;

  • 条件(Conditional)操作符,负责根据特定条件对序列中的元素进行处理;

  • 裁剪(Reducing)操作符,负责对序列中的元素执行各种自定义的裁剪操作;

  • 工具(Utility)操作符,负责一些针对流式处理的辅助性操作。

 其中,前面三种操作符统称为“转换类”操作符,剩余的三大类统称为“裁剪类”操作符。

转换类操作符 

    转换类操作符在我们编码的时候最常见了,比如 buffer、window、map 和 flatMap 等。

  • buffer 操作符

buffer 操作符的作用相当于把当前流中的元素统一收集到一个集合中,并把这个集合对象作为新的数据流。使用 buffer 操作符在进行元素收集时,可以指定集合对象所包含的元素的最大数量。

 给定一个 String 值的 Flux,每个值都包含一个水果的名称,你可以创建一个新的 List 集合的 Flux,其中每个 List 的元素数不超过指定的数目。

Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
fruitFlux.buffer(3).subscribe(System.out::println);

运行代码控制台会打印:

["Apple", "Orange", "Grape"]
["Banana", "Strawberry"]
  • window 操作符

window 操作符的作用类似于 buffer,不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,而不是一个集合。因此该操作符的返回值类型就变成了 Flux<Flux>。window 操作符相对比较复杂,如下图:

 上图比较复杂,代表的是一种对序列进行开窗的操作。我们来看下代码方便理解:

Flux.range(1, 5).window(2).toIterable().forEach(w -> {w.subscribe(System.out::println);System.out.println("-------");
});

这里我们生成了 5 个元素,然后通过 window 操作符把这 5 个元素转变成 3 个 Flux 对象。在将这些 Flux 对象转化为 Iterable 对象后,通过 forEach() 循环打印出来,运行代码控制台会打印:

1
2
-------
3
4
-------
5
  • map 操作符

map 操作符相当于一种映射操作,它对流中的每个元素应用一个映射函数从而达到转换效果,比较简单,我们来看一下示例。

Flux.just(1, 2).map(i -> "number-" + i).subscribe(System.out::println);

运行代码控制台会打印:

number-1
number-2

关于 map() 的重要理解是,映射是同步执行的,因为每个项都是由源 Flux 发布的。如果要异步执行映射,应考虑使用 flatMap() 操作。 

  • flatMap 操作符

flatMap 操作符执行的也是一种映射操作,但与 map 不同,该操作符会把流中的每个元素映射成一个流而不是一个元素,flatMap() 不是简单地将一个对象映射到另一个对象,而是将每个对象映射到一个新的 Mono 或 Flux。Mono 或 Flux 的结果被压成一个新的 Flux。当与subscribeOn() 一起使用时,flatMap() 可以释放 Reactor 类型的异步能力。然后再把得到的所有流中的元素进行合并,整个过程的流程图请看下图:

Flux.just(1, 5).flatMap(x -> Mono.just(x * x)).subscribe(System.out::println);

效果如下:

1
25

 事实上,flatMap 可以对任何你感兴趣的操作进行转换。例如,在系统开发过程中,我们经常会碰到对从数据库查询所获取的数据项逐一进行处理的场景,这时候就可以充分利用 flatMap 操作符的特性开展相关操作。

如下所示的代码演示了针对从数据库获取的 User 数据,如何使用该操作符逐一查询 User 所生成的订单信息的实现方法。

Flux<User> users = userRepository.getUsers();
users.flatMap(u -> getOrdersByUser(u))

过滤操作符

  • filter 操作符 

filter 操作符其实跟 Java8  里的 filter 方法类似,对流中的元素过滤,而过滤条件的指定一般是通过断言。 

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

 比如过滤出1到10这10个元素是偶数的数出来,其中“i % 2 == 0”代表的就是一种断言。

  • first/last 操作符

first 操作符的执行效果为返回流中的第一个元素,而 last 操作符的执行效果即返回流中的最后一个元素。

  • skip/skipLast

如果使用 skip 操作符,将会忽略数据流的前 n 个元素。类似的,如果使用 skipLast 操作符,将会忽略流的最后 n 个元素。

  • take/takeLast

take 系列操作符用来从当前流中提取元素。我们可以按照指定的数量来提取元素,也可以按照指定的时间间隔来提取元素。类似的,takeLast 系列操作符用来从当前流的尾部提取元素。

组合操作符 

Reactor 中常用的组合操作符有 then/when、merge、startWith 和 zip 等,组合操作符会比过滤操作符复杂一点。

  • then/when 操作符

then 操作符的含义是等到上一个操作完成再进行下一个。

Flux.just(1, 2, 3).then().subscribe(System.out::println);

 这里尽管生成了一个包含 1、2、3 三个元素的 Flux 流,但 then 操作符在上游的元素执行完成之后才会触发新的数据流,也就是说会忽略所传入的元素,所以上述代码在控制台上实际并没有任何输出。

和 then 一起的还有一个 thenMany 操作服务,具有同样的含义,但可以初始化一个新的 Flux 流。示例代码如下所示,这次我们会看到控制台上输出了 4 和 5 这两个元素。

Flux.just(1, 2, 3).thenMany(Flux.just(4, 5)).subscribe(System.out::println);

 对应的,when 操作符的含义则是等到多个操作一起完成。如下代码很好地展示了 when 操作符的实际应用场景。

public Mono<Void> updateOrders(Flux<Order> orders) {return orders.flatMap(file -> {Mono<Void> saveOrderToDatabase = ...;Mono<Void> sendMessage = ...;return Mono.when(saveOrderToDatabase, sendMessage);});
}

 假设我们对订单列表进行批量更新,首先把订单数据持久化到数据库,然后再发送一条通知类的消息。我们需要确保这两个操作都完成之后方法才能返回,所以用到了 when 操作符。

  • merge 操作符

merge 操作符用来把多个 Flux 流合并成一个 Flux 序列,而合并的规则就是按照流中元素的实际生成的顺序进行。

 我们通过 Flux.intervalMillis() 方法分别创建了两个 Flux 序列,然后将它们 merge 之后打印出来。

Flux.merge(Flux.intervalMillis(0, 100).take(2), Flux.intervalMillis(50, 100).take(2)).toStream().forEach(System.out::println);

 请注意,这里的第一个 intervalMillis 方法没有延迟,每隔 100 毫秒生成一个元素,而第二个 intervalMillis 方法则是延迟 50 毫秒之后才发送第一个元素,时间间隔同样是 100 毫秒。相当于两个数据序列会交错地生成数据,并合并在一起。所以以上代码的执行效果如下所示:

0
0
1
1

 和 merge 类似的还有一个 mergeSequential 方法。不同于 merge 操作符,mergeSequential 操作符则按照所有流被订阅的顺序,以流为单位进行合并。现在我们来看一下这段代码,这里仅仅将 merge 操作换成了 mergeSequential 操作。

Flux.mergeSequential (Flux.intervalMillis(0, 100).take(2), Flux.intervalMillis(50, 100).take(2)).toStream().forEach(System.out::println);

执行以上代码,我们将得到不同的结果,如下所示:

0
1
0
1

显然从结果来看,mergeSequential 操作是等上一个流结束之后再 merge 新生成的流元素。

  • zip 操作符

上面的 merge 操作符合并后的 Flux 发出的数据的顺序,与源发出的数据的时间顺序一致。由于两个 Flux 都被设置为固定频率发送数据,因此值会通过合并后的 Flux 交替出现 —— a…b…a…b 一直这样下去。如果其中任何一个 Flux 的发送时间被修改了的话,你可能会看到 2 个 a 跟在 1 个 b 后面或是 2 个 b 跟在 1 个 a 后面的情况。

因为 merge 不能保证源之间的完美交替,所以可能需要考虑使用 zip() 操作。当两个 Flux 对象压缩在一起时,会产生一个新的 Flux,该 Flux 生成一个元组,其中元组包含来自每个源 Flux 的一个项。下图说明了如何将两个 Flux 对象压缩在一起:

 使用 zip 操作符在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流,示例代码如下所示:

Flux flux1 = Flux.just(1, 2);
Flux flux2 = Flux.just(3, 4);
Flux.zip(flux1, flux2).subscribe(System.out::println);

执行效果如下:

[1,3]
[2,4]

我们可以使用 zipWith 操作符实现同样的效果,示例代码如下所示:

Flux.just(1, 2).zipWith(Flux.just(3, 4)).subscribe(System.out::println);

条件操作符

  • defaultIfEmpty 操作符

defaultIfEmpty 操作符针对空数据流提供了一个简单而有用的处理方法。该操作符用来返回来自原始数据流的元素,如果原始数据流中没有元素,则返回一个默认元素。

@GetMapping("/orders/{id}")
public Mono<ResponseEntity<Order>> findOrderById(@PathVariable String id) {return orderService.findOrderById(id).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.status(404).body(null));
}

 可以看到,这里使用 defaultIfEmpty 操作符实现默认返回值。在示例代码所展示的 HTTP 端点中,当找不到指定的数据时,我们可以通过 defaultIfEmpty 方法返回一个空对象以及 404 状态码。

  • takeUntil/takeWhile 操作符

takeUntil 操作符的基本用法是 takeUntil (Predicate predicate),其中 Predicate 代表一种断言条件,该操作符将从数据流中提取元素直到断言条件返回 true。

takeUntil 的示例代码如下所示,我们希望从一个包含 100 个连续元素的序列中获取 1~10 个元素。

Flux.range(1, 100).takeUntil(i -> i == 10).subscribe(System.out::println);

类似的,takeWhile 操作符的基本用法是 takeWhile (Predicate continuePredicate),其中 continuePredicate 代表的也是一种断言条件。与 takeUntil 不同的是,takeWhile 会在 continuePredicate 条件返回 true 时才进行元素的提取。takeWhile 的示例代码如下所示,这段代码的执行效果与 takeUntil 的示例代码一致。

Flux.range(1, 100).takeWhile(i -> i <= 10).subscribe(System.out::println);
  • skipUntil/skipWhile 操作符

与 takeUntil 相对应,skipUntil 操作符的基本用法是 skipUntil (Predicate predicate)。skipUntil 将丢弃原始数据流中的元素直到 Predicate 返回 true。

同样,与 takeWhile 相对应,skipWhile 操作符的基本用法是 skipWhile (Predicate continuePredicate),当 continuePredicate 返回 true 时才进行元素的丢弃。

裁剪操作符 

裁剪操作符通常用于统计流中的元素数量,或者检查元素是否具有一定的属性。在 Reactor 中,常用的裁剪操作符有 any 、concat、count 和 reduce 等。

  • any 操作符

any 操作符用于检查是否至少有一个元素具有所指定的属性,代码如下:

Flux.just(3, 5, 7, 9, 11, 15, 16, 17).any(e -> e % 2 == 0).subscribe(isExisted -> System.out.println(isExisted));

在这个 Flux 流中存在一个元素 16 可以被 2 除尽,所以控制台将输出“true”。

  • concat 操作符

在这个 Flux 流中存在一个元素 16 可以被 2 除尽,所以控制台将输出“true”。

  • concat 操作符

concat 操作符用来合并来自不同 Flux 的数据。与上面所介绍的 merge 操作符不同,这种合并采用的是顺序的方式,所以严格意义上并不是一种合并操作,所以我们把它归到裁剪操作符类别中。

Flux.concat(Flux.range(1, 3),Flux.range(4, 2),Flux.range(6, 5)
).subscribe(System.out::println);

我们将在控制台中依次看到 1 到 10 这 10 个数字。

  • reduce 操作符

裁剪操作符中最经典的就是这个 reduce 操作符。reduce 操作符对来自 Flux 序列中的所有元素进行累积操作并得到一个 Mono 序列,该 Mono 序列中包含了最终的计算结果。reduce 操作符示意图如下所示:

这里的 BiFunction 就是一个求和函数,用来对 1 到 10 的数字进行求和,运行结果为 55。

Flux.range(1, 10).reduce((x, y) -> x + y).subscribe(System.out::println);

与 reduce 操作符类似的还有一个 reduceWith 操作符,用来在 reduce 操作时指定一个初始值。reduceWith 操作符的代码示例如下所示,我们使用 5 来初始化求和过程,显然得到的结果将是 60。

Flux.range(1, 10).reduceWith(() -> 5, (x, y) -> x + y).subscribe(System.out::println);

 工具操作符

Reactor 中常用的工具操作符有 subscribe、timeout、block、log 和 debug 等。

  • subscribe 操作符

 subscribe 操作符用使用最多,接下来看下它的 API。 

/订阅流的最简单方法,忽略所有消息通知
subscribe();//对每个来自 onNext 通知的值调用 dataConsumer,但不处理 onError 和 onComplete 通知
subscribe(Consumer<T> dataConsumer);//在前一个重载方法的基础上添加对 onError 通知的处理
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer);//在前一个重载方法的基础上添加对 onComplete 通知的处理
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer, Runnable completeConsumer);//这种重载方法允许通过请求足够数量的数据来控制订阅过程
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer, Runnable completeConsumer, Consumer<Subscription> subscriptionConsumer);//订阅序列的最通用方式,可以为我们的 Subscriber 实现提供所需的任意行为
subscribe(Subscriber<T> subscriber);
  • timeout 操作符

timeout 操作符非常简单,保持原始的流发布者,当特定时间段内没有产生任何事件时,将生成一个异常。

  • block 操作符

 顾名思义,block 操作符在接收到下一个元素之前会一直阻塞。block 操作符常用来把响应式数据流转换为传统数据流。例如,使用如下方法将分别把 Flux 数据流和 Mono 数据流转变成普通的 List对象和单个的 Order 对象,我们同样可以设置 block 操作的等待时间。

public List<Order> getAllOrders() {return orderservice.getAllOrders().block(Duration.ofSecond(5));
}public Order getOrderById(Long orderId) {return orderservice.getOrderById(orderId).block(Duration.ofSecond(2));
}
  • log 操作符

Reactor 中专门提供了针对日志的工具操作符 log,它会观察所有的数据并使用日志工具进行跟踪。我们可以通过如下代码演示 log 操作符的使用方法,在 Flux.just() 方法后直接添加 log() 函数。

Flux.just(1, 2).log().subscribe(System.out::println);

 以上代码的执行结果如下所示(为了显示简洁,部分内容和格式做了调整)。通常,我们也可以在 log() 方法中添加参数来指定日志分类的名称。

Info: | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
Info: | request(unbounded)
Info: | onNext(1)
1
Info: | onNext(2)
2
Info: | onComplete()

相关文章:

响应式操作实战案例

Project Reactor 框架 在Spring Boot 项目 Maven 中添加依赖管理。 <dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId> </dependency><dependency><groupId>io.projectreactor</g…...

NetApp AFF A900:针对任务关键型应用程序的解决方案

NetApp AFF A900&#xff1a;适用于数据中心的解决方案 AFF A 系列中的 AFF A900 高端 NVMe 闪存存储功能强大、安全可靠、具有故障恢复能力&#xff0c;提供您为任务关键型企业级应用程序提供动力并保持数据始终可用且安全所需的一切。 AFF A900&#xff1a;针对任务关键型应…...

使用Houdini输出四面体网格并输出tetgen格式

我们的目标是从houdini输出生成的四面体&#xff0c;希望是tetgen格式的。 众所周知&#xff0c;houdini是不能直接输出四面体的。 有三方案去解决&#xff1a; 输出点云ply文件&#xff0c;然后利用tetgen生成网格。输出Hounidi内置的.geo格式文件&#xff0c;然后写个脚本…...

组合预测 | MATLAB实现EMD-KPCA-LSTM、EMD-LSTM、LSTM多输入单输出回归预测对比

组合预测 | MATLAB实现EMD-KPCA-LSTM、EMD-LSTM、LSTM多输入单输出回归预测对比 目录 组合预测 | MATLAB实现EMD-KPCA-LSTM、EMD-LSTM、LSTM多输入单输出回归预测对比预测效果基本介绍模型描述程序设计参考资料预测效果 基本介绍 MATLAB实现EMD-KP...

【C语言】操作符详解总结(万字)

操作符详解1. 操作符分类2. 算术操作符3. 移位操作符3.1 整数的二进制是怎么形成的3.2 左移操作符3.3 右移操作符4. 位操作符5. 赋值操作符6. 单目操作符6.1 单目操作符介绍6.2 sizeof 和 数组7. 关系操作符8. 逻辑操作符9. 条件操作符9.1 练习19.2 练习210. 逗号表达式11. 下标…...

mac系统手册(帮助/说明)

文章目录1. mac自带的帮助文档2. Mac使用技巧&#xff08;提示&#xff09;2.1 聚焦搜索2.2 截图&#xff08;录制屏幕&#xff09;2.3 调出右键菜单2.4 快速查看2.5 翻译2.5.1 词典解释2.5.2 翻译&#xff08;字、词和句&#xff09;3. macOS使用手册3.1 在聚焦中进行计算和转…...

VLC播放器Demo(录像,截图等功能),Android播放器Demo可二次开发。

VLC播放器Demo&#xff08;录像&#xff0c;截图等功能&#xff09;&#xff0c;可二次开发。 GitHub地址:https://github.com/ILoveLin/VlcRecordPlayer GitHub地址:https://github.com/ILoveLin/VlcRecordPlayer GitHub地址:https://github.com/ILoveLin/VlcRecordPlayer …...

WeSpeaker支持C++部署链路

WeSpeaker正式更新C部署链路&#xff0c;推理引擎使用OnnxRuntime&#xff0c;支持从语音中提取Speaker Embedding信息&#xff0c;代码详见WeSpeaker/runtime[1]。 Libtorch和onnx的选择? Speaker Embedding提取任务流程简单&#xff0c;并且声纹模型&#xff08;如ResNet\E…...

window vscode编辑appsmith源码

前言 本来最开始用的idea打开wsl中的appsmith&#xff0c;卡得一批。最后没办法&#xff0c;用自己的电脑装成ubuntu server&#xff0c;然后vscode的远程开发对appsmith源码进行编辑。如果自己电脑内存16个G或者更大可能打开wsl中的估计会还好&#xff0c;我公司电脑只有8g所…...

操作系统面试题

操作系统一、简介篇1.解释一下什么是操作系统2.操作系统的主要功能3.软件访问硬件的几种方式4.操作系统的主要目的是什么5.为什么Linux系统下的应用程序不能直接在Windows下运行6.什么是用户态和内核态7.用户态和内核态如何切换8.什么是内核二、进程和线程篇1.多处理系统的优势…...

Kafka入门(七)

下面聊聊Kafka的配置参数&#xff0c;包括生产者的配置参数、Broker的配置参数、消费者的配置参数。 1、生产者配置参数 acks 该参数控制了生产者的消息发送确认机制&#xff0c;用于指定分区中必须有多少个副本成功接收到消息后生产者才会认为这条消息写入是成功的&#xff0c…...

微服务介绍

微服务 微服务架构发展 微服务这个概念最早是在2011年5月威尼斯的一个软件架构会议上讨论提出的&#xff0c;用于描述一些作为通用架构风格的设计原则&#xff1b;2012年3月在波兰举行的Degree Conference大会&#xff0c;james lewis做演讲&#xff0c;讨论了微服务一些原则…...

搭建SpringBoot多模块微服务项目脚手架(三)

搭建SpringBoot多模块微服务项目脚手架(三) 文章目录搭建SpringBoot多模块微服务项目脚手架(三)1.概述项目结构2.接口返回统一信息模板2.1.封装返回统一信息思路介绍2.2.封装json数据格式1.导入依赖2.封装code码3.封装json格式模板4.使用统一返回信息3.接口统一请求信息模板3.1…...

对vue3中reactive、toref、torefs、ref的详细理解

reactive&#xff1a;将平常的一个对象转换成响应式对象。所谓的响应式对象就是当页面点击修改此对象时&#xff0c;页面无需刷新而在页面上的其他地方有用到这个对象的地方会自动同步修改过来例如&#xff1a; <template><div class"container"><di…...

C++ Primer Plus 第6版 读书笔记(6) 第 6 章 分支语句和逻辑运算符

第 6 章 分支语句和逻辑运算符 C是在 C 语言基础上开发的一种集面向对象编程、泛型编程和过程化编程于一体的编程语言&#xff0c;是C语言的超集。本书是根据2003年的ISO/ANSI C标准编写的&#xff0c;通过大量短小精悍的程序详细而全面地阐述了 C的基本概念和技术&#xff0c;…...

Java Class 加密工具 ClassFinal

Jar包加密工具 ClassFinal介绍环境依赖使用说明下载加密命令行示例maven插件方式无密码模式机器绑定启动加密后的jar启动参数给密码不加密码参数直接启动1. 密码文件获取2. 交互输入参考资料介绍 ClassFinal 是一款 java class 文件安全加密工具&#xff0c;支持直接加密jar包…...

【蓝桥杯集训·每日一题】AcWing 3555. 二叉树

文章目录一、题目1、原题链接2、题目描述二、解题报告1、思路分析2、时间复杂度3、代码详解三、知识风暴最近公共祖先一、题目 1、原题链接 3555. 二叉树 2、题目描述 给定一个 n 个结点&#xff08;编号 1∼n&#xff09;构成的二叉树&#xff0c;其根结点为 1 号点。 进行 m…...

【JavaScript运行原理之V8引擎】V8引擎解析JavaScript代码原理

1. 编程语言的执行 高级语言最终都需要编译为低级语言才能被硬件执行&#xff0c;越高级的语言中间的转换时间越长&#xff0c;效率越低&#xff0c;越低级的语言执行素的越快&#xff0c;但是由于缺少高级语言便捷的语法特性所以很难编写代码。 2. 大杂烩JS 它是作者在1995…...

C++11:智能指针

文章目录1. 介绍1.1 动态内存与智能指针2. 使用2.1 创建2.2 使用3. 原理3.1 RAII3.2 像指针一样使用3.3 支持智能指针对象拷贝auto_ptrRAII4. 标准库中的智能指针4.1 unique_ptr模拟实现4.2 shared_ptr引用计数模拟实现定制删除器4.3 weak_ptrshared_ptr造成的循环引用问题与sh…...

ccc-pytorch-RNN(7)

文章目录一、RNN简介二、RNN关键结构三、RNN的训练方式四、时间序列预测五、梯度弥散和梯度爆炸问题一、RNN简介 RNN&#xff08;Recurrent Neural Network&#xff09;中文循环神经网络&#xff0c;用于处理序列数据。它与传统人工神经网络和卷积神经网络的输入和输出相互独立…...

docker安装(linux)

安装需要的软件包 yum install -y yum-utils 设置stable镜像仓库&#xff08;使用阿里云镜像&#xff09; yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo 更新yum软件包索引 yum makecache fast 安装DOCKER 引擎 yum -y…...

【数据库概论】10.1 事务及其作用

事务是一系列的数据库操作&#xff0c;是数据库应用程序的基本逻辑单元 10.1 事务的基本概念 1.事务 事务是用户定义的一个数据库操作序列&#xff0c;是一个具有原子性的操作&#xff0c;不可再分&#xff0c;一个事务内的操作要么全做、要么都不做。一般来说&#xff0c;一…...

通讯录(C++实现)

系统需求通讯录是一个可以记录亲人、好友信息的工具。本章主要利用C来实现一个通讯录管理系统系统中需要实现的功能如下:添加联系人:向通讯录中添加新人&#xff0c;信息包括&#xff08;姓名、性别、年龄、联系电话、家庭住址&#xff09;最多记录1000人显示联系人:显示通讯录…...

轻松掌握C++的模板与类模板,将Tamplate广泛运用于我们的编程生活

C提高编程 本阶段主要针对C泛型编程和STL技术做详细讲解&#xff0c;探讨C更深层的使用 泛型编程:编写与类型无关的通用代码,是代码复用的一种手段。 模板 1.模板的概念 模板就是建立通用的模具&#xff0c;大大提高复用性 例如&#xff1a; 2.函数模板 C另一种编程思想称…...

pandas 数据预处理+数据概览 处理技巧整理(持续更新版)

这篇文章主要是整理下使用pandas的一些技巧&#xff0c;因为经常不用它&#xff0c;这些指令忘得真的很快。前段时间在数模美赛中已经栽过跟头了&#xff0c;不希望以后遇到相关问题的时候还去网上查&#xff08;主要是太杂了&#xff09;。可能读者跟我有一样的问题&#xff0…...

mmdetectionV2.x版本 训练自己的VOC数据集

mmdetection目录下创建data文件夹&#xff0c;路劲如图所示&#xff0c;不带yololabels 修改配置文件 mmdet/datasets/voc.py 配置图片格式 mmdet/datasets/xml_style.py 如果图片是jpg则改成jpg&#xff0c;是png格式就改成png&#xff0c;这里我不需要改&#xff0c;本…...

Shell - crontab 定时 git 拉取并执行 maven 打包

目录 一.引言 二.踩坑与实践 1.原始代码 2.mvn package 未执行与解决 [导入环境变量] 3.git pull 未执行与解决 [添加绝对路径] 三.总结 一.引言 git 任务部署在通道机&#xff0c;每天6点需要定时更新 jar 包并打包上线&#xff0c;所以需要在 linux 服务器上&#xff…...

408考研计算机之计算机组成与设计——知识点及其做题经验篇目3:指令的寻址方式

上篇文章我们讲到&#xff0c;指令的基本格式&#xff0c;一条指令通常包括操作码字段和地址码字段两部分&#xff1a; 操作码字段地址码字段并且我们还讲到根据操作数地址码的数目不同&#xff0c;可将指令分为零一二三四地址指令。感兴趣的小伙伴们可以看看小编的上一篇文章…...

前端包管理工具:npm,yarn、cnpm、npx、pnpm

包管理工具npm Node Package Manager&#xff0c;也就是Node包管理器&#xff1b; 但是目前已经不仅仅是Node包管理器了&#xff0c;在前端项目中我们也在使用它来管理依赖的包&#xff1b; 比如vue、vue-router、vuex、express、koa、react、react-dom、axios、babel、webpack…...

推荐系统 FM因式分解

reference&#xff1a;知乎 FM算法解析 LR算法没有二阶交叉 如果是id类特征&#xff0c;这里的x是0/1&#xff0c;raw的特征输入就是float&#xff0c;当然&#xff0c;在我的理解里&#xff0c;一般会把raw的特征进行分桶&#xff0c;还是映射到0/1特征&#xff0c;不然这个w…...

Maven基础入门

文章目录Maven简介Maven 工作模式1.仓库2.坐标Maven的基本使用1.常用命令2.生命周期依赖管理1.依赖配置2.依赖传递3.可选依赖4.排除依赖5.依赖范围IDEA配置MavenMaven简介 Apache Maven 是一个项目管理和构建工具&#xff0c;它基于项目对象模型(POM)的概念&#xff0c;通过一…...

传输层协议 TCP UDP

目录 协议前菜 端口号 ​编辑端口号范围划分 认识知名端口号(Well-Know Port Number) netstat pidof 传输层协议 UDP协议 UDP协议端格式 UDP的特点 面向数据报 UDP的缓冲区 UDP使用注意事项 基于UDP的应用层协议 TCP协议 TCP协议概念 TCP协议段格式 标志…...

一点就分享系列(实践篇6——上篇)【迟到补发】Yolo-High_level系列算法开源项目融入V8 旨在研究和兼容使用【持续更新】

一点就分享系列&#xff08;实践篇5-补更篇&#xff09;[迟到补发]—Yolo系列算法开源项目融入V8旨在研究和兼容使用[持续更新] 题外话 去年我一直复读机式强调High-level在工业界已经饱和的情况&#xff0c;目的是呼吁更多人看准自己&#xff0c;不管是数字孪生交叉领域&#…...

buu RSA 1 (Crypto 第一页)

题目描述&#xff1a; 两个文件&#xff0c;都用记事本打开&#xff0c;记住用记事本打开 pub.key: -----BEGIN PUBLIC KEY----- MDwwDQYJKoZIhvcNAQEBBQADKwAwKAIhAMAzLFxkrkcYL2wch21CM2kQVFpY97 /AvKr1rzQczdAgMBAAE -----END PUBLIC KEY-----flag.enc: A柪YJ^ 柛x秥?y…...

Python 二分查找:bisect库的使用

✅作者简介&#xff1a;人工智能专业本科在读&#xff0c;喜欢计算机与编程&#xff0c;写博客记录自己的学习历程。 &#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&…...

性能优化之HBase性能调优

HBase是Hadoop生态系统中的一个组件&#xff0c;是一个分布式、面向列存储的内存型开源数据库&#xff0c;可以支持数百万列&#xff08;MySQL4张表在HBase中对应1个表&#xff0c;4个列&#xff09;、超过10亿行的数据存储。可用作&#xff1a;冷热数据分离HBase适合作为冷数据…...

图像金字塔,原理、实现及应用

什么是图像金字塔 图像金字塔是对图像的一种多尺度表达&#xff0c;将各个尺度的图像按照分辨率从小到大&#xff0c;依次从上到下排列&#xff0c;就会形成类似金字塔的结构&#xff0c;因此称为图像金字塔。 常见的图像金字塔有两类&#xff0c;一种是高斯金字塔&#xff0…...

08-Oracle游标管理(定义,打开、获取数据及关闭游标)

目标 1.确定何时需要显示游标2.声明、打开和关闭显示游标3.从显示游标中提取数据4.了解与游标有关的属性5.使用游标FOR循环检索游标中的数据6.在游标FOR循环的子查询中声明游标7.评估使用逻辑运算符结合在一起的布尔条件游标 1、在使用一个PL/SQL块来执行DML语句或只返回一行结…...

Python判断字符串是否包含特定子串的7种方法

目录1、使用 in 和 not in2、使用 find 方法3、使用 index 方法4、使用 count 方法5、通过魔法方法6、借助 operator7、使用正则匹配转自&#xff1a;https://cloud.tencent.com/developer/article/1699719我们经常会遇这样一个需求&#xff1a;判断字符串中是否包含某个关键词…...

aop实现接口访问频率限制

引言 项目开发中我们有时会用到一些第三方付费的接口&#xff0c;这些接口的每次调用都会产生一些费用&#xff0c;有时会有别有用心之人恶意调用我们的接口&#xff0c;造成经济损失&#xff1b;或者有时需要对一些执行时间比较长的的接口进行频率限制&#xff0c;这里我就简…...

Hive---窗口函数

Hive窗口函数 其他函数: Hive—Hive函数 文章目录Hive窗口函数开窗数据准备建表导入数据聚合函数window子句LAG(col,n,default_val) 往前第 n 行数据LEAD(col,n, default_val) 往后第 n 行数据ROW_NUMBER() 会根据顺序计算RANK() 排序相同时会重复&#xff0c;总数不会变DENSE…...

JavaSe第7次笔记

1. C语言里面&#xff0c;NULL是0地址。Java中null和0地址没关系。 2.数组可以做方法的返回值。 3.可以使用变量作为数组的个数开辟空间。 4.断言assert&#xff0c;需要设置。 5.排序&#xff1a;Arrays. sort(array); 6.查找&#xff1a; int index Arrays. binarySea…...

什么是 Service 以及描述下它的生命周期。Service 有哪些启动方法,有 什么区别,怎样停用 Service?

在 Service 的生命周期中,被回调的方法比 Activity 少一些,只有 onCreate, onStart, onDestroy, onBind 和 onUnbind。 通常有两种方式启动一个 Service,他们对 Service 生命周期的影响是不一样的。 1. 通过 startService Service 会经历 onCreate 到 onStart,然后处于运行…...

Redis部署

JAVA安装 mkdir /usr/local/javacd /usr/local/java/wget --no-check-certificate --no-cookies --header "Cookie: oraclelicenseaccept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u13…...

AT32F437制作Bootloader然后实现Http OTA升级

首先创建一个AT32F437的工程&#xff0c;然后发现调试工程配置这里的型号和创建工程选的型号不一致&#xff0c;手动更改一下&#xff0c;使用PW Link下载程序的话还要配置一下pyocd.exe的路径。 打开drv_clk.c文件的调试功能看下系统时钟频率。 项目使用的是AT32F437VMT7芯片&…...

Springboot项目启动初始化数据缓存

1.从Java EE5规范开始&#xff0c;Servlet中增加了两个影响Servlet生命周期的注解&#xff0c; PostConstruct和PreDestroy&#xff0c;这两个注解被用来修饰一个非静态的void&#xff08;&#xff09;方法&#xff0c;被PostConstruct修饰的方法会在服务器加载Servlet的时候运…...

深度学习必备知识——模型数据集Yolo与Voc格式文件相互转化

在深度学习中&#xff0c;第一步要做的往往就是处理数据集,尤其是学习百度飞桨PaddlePaddle的小伙伴&#xff0c;数据集经常要用Voc格式的&#xff0c;比如性能突出的ppyolo等模型。所以学会数据集转化的本领是十分必要的。这篇博客就带你一起进行Yolo与Voc格式的相互转化&…...

数据、数据资源及数据资产管理的区别

整理不易&#xff0c;转发请注明出处&#xff0c;请勿直接剽窃&#xff01; 点赞、关注、不迷路&#xff01; 摘要&#xff1a;数据、数据资源、数据资产 数据、数据资源及数据资产的区别 举例 CRM系统建设完成后会有很多数据&#xff0c;这些数据就是原始数据&#xff0c;业务…...

标度不变性(scale invariance)与无标度(scale-free)概念辨析

文章目录标度标度种类名义标度序级标度等距标度比率标度常用标度方法不足标度不变性标度不变&#xff08;Scale-invariant&#xff09;曲线和自相似性&#xff08;self-similarity&#xff09;射影几何分形随机过程中的标度不变性标度不变的 Tweedie distribution普适性&#x…...

WMS仓库管理系统解决方案,实现仓库管理一体化

仓库是企业的核心环节&#xff0c;若没有对库存的合理控制和送货&#xff0c;将会造成成本的上升&#xff0c;服务品质的难以得到保证&#xff0c;进而降低企业的竞争能力。WMS仓库管理系统包括基本信息&#xff0c;标签&#xff0c;入库&#xff0c;上架&#xff0c;领料&…...