西安营销网站/企业内训
Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:
pom依赖
<dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>5.7.2</version><scope>test</scope></dependency></dependencies>
1. 创建 Mono 和 Flux
Mono
: 用于表示包含零个或一个元素的异步序列。Flux
: 用于表示包含零个或多个元素的异步序列。
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;public class ReactorCreateExample {public static void main(String[] args) {// 创建包含单个元素的 MonoMono<String> mono = Mono.just("Hello, Reactor!");// 创建包含多个元素的 FluxFlux<Integer> flux = Flux.fromArray(new Integer[]{1, 2, 3, 4, 5});mono.subscribe(System.out::println); // 输出: Hello, Reactor!flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}
2. 转换操作符
使用转换操作符对数据流进行转换或处理。
import reactor.core.publisher.Flux;public class ReactorTransformExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 对每个元素进行平方操作Flux<Integer> squared = source.map(x -> x * x);squared.subscribe(System.out::println); // 输出: 1, 4, 9, 16, 25}
}
3. 过滤操作符
使用过滤操作符筛选数据流中的元素。
import reactor.core.publisher.Flux;public class ReactorFilterExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 筛选偶数Flux<Integer> evenNumbers = source.filter(x -> x % 2 == 0);evenNumbers.subscribe(System.out::println); // 输出: 2, 4}
}
4. 组合操作符
使用组合操作符组合多个数据流。
import reactor.core.publisher.Flux;public class ReactorCombineExample {public static void main(String[] args) {Flux<Integer> source1 = Flux.range(1, 3);Flux<Integer> source2 = Flux.range(4, 3);// 合并两个数据流Flux<Integer> merged = Flux.concat(source1, source2);merged.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5, 6}
}
这些只是 Reactor API 的一小部分示例。Reactor 提供了丰富的操作符和方法,用于处理复杂的异步数据流。开发人员可以根据具体需求选择适当的操作符进行组合,以构建出符合业务逻辑的异步处理链。
5. 错误处理
Reactor 提供了多种处理错误的方式,例如使用 onErrorResume
, onErrorReturn
, doOnError
等方法。
import reactor.core.publisher.Flux;public class ReactorErrorHandlingExample {public static void main(String[] args) {Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);// 处理除零异常并提供默认值Flux<Integer> result = source.map(x -> 10 / x).onErrorResume(ex -> Flux.just(-1));result.subscribe(System.out::println); // 输出: 10, 5, -1}
}
6. 背压处理
Reactor 提供了背压处理的支持,允许生产者和消费者之间实现合理的数据流控制。使用 onBackpressureBuffer
或者其他背压操作符可以处理高速生产者和慢速消费者之间的数据流。
import reactor.core.publisher.Flux;public class ReactorBackpressureExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 1000);// 设置缓冲区大小Flux<Integer> buffered = source.onBackpressureBuffer(10);buffered.subscribe(data -> {// 模拟慢速消费者try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(data);},error -> System.err.println("Error: " + error),() -> System.out.println("Done"));}
}
- TODO:未能实现没有背压和有背压的对比
7. 使用 Reactor WebFlux 处理 Web 请求
Reactor 还提供了 WebFlux 模块,用于处理响应式的 Web 请求。以下是一个简单的示例:
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;@RestController
public class WebFluxController {@GetMapping("/hello")public Mono<ResponseEntity<String>> hello() {return Mono.just(ResponseEntity.ok("Hello, Reactor WebFlux!"));}
}
8. Reactor 核心概念
Reactor 中有一些核心概念,了解这些概念有助于更好地使用 Reactor API。
-
Publisher(发布者): 代表一个生产数据的源头,通常是
Mono
或Flux
。 -
Subscriber(订阅者): 用于消费数据流的组件。通过
subscribe
方法订阅Publisher
。 -
Subscription(订阅): 代表
Subscriber
和Publisher
之间的连接。Subscriber
可以使用Subscription
来请求数据,取消订阅等。 -
Processor(处理器): 既是
Publisher
又是Subscriber
,用于在两者之间进行转换和处理。
public class ReactorCoreConceptsExample {public static void main(String[] args) {// 创建发布者Flux<Integer> source = Flux.range(1, 5);// 创建处理器,并进行数据处理UnicastProcessor<Integer> processor = UnicastProcessor.create();source.map(value -> value * 2) // Example: doubling the values.subscribe(processor);// 创建订阅者CustomSubscriber<Integer> subscriber = new CustomSubscriber<>();// 订阅并处理数据processor.subscribe(subscriber);}// 自定义订阅者static class CustomSubscriber<T> extends BaseSubscriber<T> {@Overrideprotected void hookOnNext(T value) {System.out.println("Processed Value: " + value);}@Overrideprotected void hookOnError(Throwable throwable) {System.err.println("Error: " + throwable);}@Overrideprotected void hookOnComplete() {System.out.println("Done");}}
}
UnicastProcessor.create()
已弃用,可以使用Sinks.many().unicast().onBackpressureBuffer()
9. Reactor 调度器
Reactor 提供了多种调度器,用于控制异步操作的执行线程。例如,Schedulers.boundedElastic()
创建了一个弹性线程池,可以根据需要动态调整线程数。
public class ReactorSchedulersExample {public static void main(String[] args) {Flux.range(1, 5).publishOn(Schedulers.boundedElastic()) // 在弹性线程池上发布.map(x -> x * x).subscribeOn(Schedulers.parallel()) // 在并行线程池上订阅.subscribe(System.out::println);}
}
- 经测试,大概率只使用了一个线程
11. 组合多个 Mono 或 Flux
使用 zip
操作符可以组合多个 Mono
或 Flux
,将它们的元素进行组合。
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactorZipExample {public static void main(String[] args) {Mono<String> mono1 = Mono.just("Hello");Mono<String> mono2 = Mono.just("Reactor");// 将两个 Mono 合并为一个 FluxFlux<String> result = Flux.zip(mono1, mono2).map(tuple -> tuple.getT1() + " " + tuple.getT2());result.subscribe(System.out::println); // 输出: Hello Reactor}
}
12. 超时操作
使用 timeout
操作符可以在指定的时间内等待数据流产生结果,如果超时,则触发错误。
public class ReactorTimeoutExample {public static void main(String[] args) throws InterruptedException {Flux<Integer> source = Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(2)); // 模拟延迟// 在指定时间内等待数据流产生结果,否则触发超时source.timeout(Duration.ofSeconds(1)).subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error),() -> System.out.println("Done"));//睡一会,等待任务执行完成Thread.sleep(3333);}
}
13. 并行操作
使用 parallel
操作符可以将一个数据流并行处理,提高处理速度。
public class ReactorParallelExample {public static void main(String[] args) throws InterruptedException {Flux.range(1, 10).parallel().runOn(Schedulers.parallel()).map(x -> x * x).sequential().subscribe(System.out::println);//睡一会,等待任务执行完成Thread.sleep(1111);}
}
14. 与 Java Stream 集成
Reactor 与 Java Stream 可以方便地进行集成。
import reactor.core.publisher.Flux;
import java.util.stream.Stream;public class ReactorJavaStreamIntegrationExample {public static void main(String[] args) {Flux<Integer> flux = Flux.fromStream(Stream.of(1, 2, 3, 4, 5));flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5}
}
15. 使用 Mono 和 Flux 进行条件操作
Reactor 提供了条件操作符,例如 switchIfEmpty
和 filter
,用于根据条件处理数据流。
public class ReactorConditionalOperatorsExample {public static void main(String[] args) {Flux<Integer> empty = Flux.range(1, 0);Flux<Integer> source = Flux.range(1, 5);// 如果数据流为空,则切换到另一个数据流empty.switchIfEmpty(Flux.range(6, 3)).subscribe(System.out::println); // 输出: 6,7,8// 使用 filter 过滤元素source.filter(x -> x % 2 == 0).subscribe(System.out::println); // 输出: 2, 4}
}
16. 使用 Reactor StepVerifier 进行测试
代码需要写在test测试目录下!!!
Reactor 提供了 StepVerifier
类,用于测试异步操作的行为。
public class ReactorTestingExample {public static void main(String[] args) {Flux<Integer> flux = Flux.range(1, 5);// 使用 StepVerifier 验证数据流的行为StepVerifier.create(flux).expectNext(1, 1, 3, 4, 5)//正确顺序应该是12345.expectComplete().verify();}
}
17. 使用 Mono 和 Flux 进行重试
Reactor 提供了 retryWhen
方法,结合 Backoff
操作符,用于在发生错误时进行重试。
public class ReactorRetryExample {public static void main(String[] args) throws InterruptedException {Mono<Object> source = Mono.fromCallable(() -> {throw new RuntimeException("Simulated error");})//最大重试次数为3次,初始重试间隔为1秒,并且采用指数回退策略,直到达到最大的回退时间(这里是5秒)。.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)));source.subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error.getMessage()));//得多睡会儿,让它跑完最大重试时间Thread.sleep(999999);}
}
19. 使用 Reactor Context 进行上下文传递
Reactor 提供了 Context
类,用于在操作链中传递上下文信息。这对于在异步操作中共享信息非常有用。
import reactor.core.publisher.Mono;
import reactor.util.context.Context;public class ReactorContextExample {public static void main(String[] args) {Mono<String> mono = Mono.deferContextual(contextView ->Mono.just("Hello, " + contextView.get("user")));String result = mono.contextWrite(Context.of("user", "John")).block();System.out.println(result); // 输出: Hello, John}
}
20. 使用 Reactor 的 doOn
方法进行副作用处理
doOn
系列方法允许在数据流的不同生命周期阶段执行副作用操作,如日志记录、统计等。
import reactor.core.publisher.Flux;public class ReactorDoOnExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);source.doOnNext(value -> System.out.println("Processing element: " + value)).doOnComplete(() -> System.out.println("Processing complete")).subscribe(System.out::println);}
}
21. 使用 Reactor 的 transform
方法进行操作链重用
transform
方法允许对操作链进行重用,将一系列操作组合为一个新的 Function
。
import reactor.core.publisher.Flux;import java.util.function.Function;public class ReactorTransformExample {public static void main(String[] args) {Flux<Integer> source = Flux.range(1, 5);// 定义一个操作链Function<Flux<Integer>, Flux<Integer>> customTransform = flux ->flux.filter(x -> x % 2 == 0).map(x -> x * 2);// 使用 transform 应用自定义操作链source.transform(customTransform).subscribe(System.out::println); // 输出: 4, 8}
}
学习打卡:Java学习笔记-day06-响应式编程Reactor API大全(上)
相关文章:

响应式编程Reactor API大全(上)
Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例: pom依赖 <dependencyMan…...

vue3自定义指令
一个自定义指令由一个包含类似组件生命周期钩子的对象来定义。钩子函数会接收到指令所绑定元素作为其参数。 页面内创建自定义指令 下面是一个自定义指令的例子,当一个 input 元素被 Vue 插入到 DOM 中后,它会被自动聚焦: <script setu…...

ECharts 多季度连续显示到一个图中。
效果图 二.相关option 以下option可以复制到 echarts的编辑器 进行查看修改 const site test1; const site2 test2;const qtrlyOption function (data: any, titleText: string): any {//获取最大值 。最大最小值的目的是:使左右里边的所有bar使用同一个指标let …...

【Microsoft Copilot】手机端发布 ——GPT-4, DALL-E3 免费用
Microsoft Copilot 关于Microsoft CopilotMicrosoft Copilot 的特点1. 可以在手机端使用:2. 可以免费使用GPT-4。3. 可以无限制地使用GPT-4。4. 可以使用DALL-E3生成图片。5. 搜索功能6. 图像识别 Microsoft Copilot的缺点和注意事项1. 非常容易报错2. 不支持长篇聊…...

[蓝桥杯 2013 省 AB] 错误票据
题目背景 某涉密单位下发了某种票据,并要在年终全部收回。 题目描述 每张票据有唯一的 ID 号,全年所有票据的 ID 号是连续的,但 ID 的开始数码是随机选定的。因为工作人员疏忽,在录入 ID 号的时候发生了一处错误,造…...

IDEA GitHub令牌原理(Personal Access Token)
1.IDEA的add github account 是什么原理? 在IntelliJ IDEA中添加GitHub账户,主要是为了让IDEA能够与GitHub进行交互,如克隆GitHub上的仓库,提交代码到GitHub等。其基本原理如下: 用户在IDEA中输入GitHub的用户名和密…...

[开发语言][python][c++]:C++中的this指针和Python中的Self -- 26岁生日
C中的this指针和Python中的Self 1. python中的Self2. C中的this指针3. C中的this指针和Python中self的异同点: 以朋友的新岁祝福开篇,祝笔者也祝大家☺️: 一岁一礼 一寸欢喜且喜且乐 且以永日 From VardoZ癸卯年十一月廿六(兔年)之…...

Android Traceview 定位卡顿问题
Traceview 是一个 Android 性能分析工具,用于时间性能分析,主要帮助开发者了解应用程序中各个方法的执行时间和调用关系。Traceview 可以通过图形化界面查看应用程序的代码执行细节,包括每个方法的调用次数、方法调用的时间消耗、方法调用堆栈…...

第三方 Cookie 被禁用?企业该如何实现用户精准运营和管理?
从 1 月 4 日开始,谷歌 Chrome 浏览器将逐步禁用第三方 Cookie 。作为全球最大的浏览器之一,Chrome 的这一动作无疑将引发行业内的重大变革。一直以来,第三方 Cookie 都是网络营销和广告的重要工具。然而,随着人们对隐私保护的日益…...

Autosar PNC网络管理配置(2)-基于ETAS软件
文章目录 BswM初始化PNC对PDU的控制BswMModeRequestPortBswMModeConditionBswMLogicalExpressionBswMRuleBswMActionListEcuMEcuMWakeupSourceEcuMShutdownCauseEcuMRbAlSwitchOffCalloutEcuMRbOnGoOff...

【SpringMVC快速使用】1.@RestController @RequestMapping 2.logback的使用
背景:为何从这个最简单的 例子写起呢? 那是因为我们的管理后台之类的都是别人写的,我也听说了大家说:只用Post请求就足够了,但是却发现,在浏览器中测试时,默认是GET请求,如果直接写…...

C2593 operator << 不明确
错误 C2593 “operator <<” 不明确,通常出现在C代码中,当你尝试使用<<运算符(通常用于输出或位移运算)时,编译器无法确定使用哪个重载版本的运算符。这个错误可能由几个原因引起: 多个重载冲突…...

vue:使用【3.0】:条件模块
一、条件层级效果图 二、代码 <template><ContentWrap><!-- 添加条件分支:level1 --><div class"btnBox" v-if"isEdit"><el-button type"primary" click"add">添加条件分支</el-button></div…...

Kafka与RabbitMQ的区别
消息队列介绍 消息队列(Message Queue)是一种在分布式系统中进行异步通信的机制。它允许一个或多个生产者在发送消息时暂时将消息存储在队列中,然后由一个或多个消费者按顺序读取并处理这些消息。 消息队列具有以下特点: 异步通…...

C++力扣题目538--把二叉搜索树转换为累加树
给出二叉 搜索 树的根节点,该树的节点值各不相同,请你将其转换为累加树(Greater Sum Tree),使每个节点 node 的新值等于原树中大于或等于 node.val 的值之和。 提醒一下,二叉搜索树满足下列约束条件&#…...

曲线生成 | 图解贝塞尔曲线生成原理(附ROS C++/Python/Matlab仿真)
目录 0 专栏介绍1 贝塞尔曲线的应用2 图解贝塞尔曲线3 贝塞尔曲线的性质4 算法仿真4.1 ROS C仿真4.2 Python仿真4.3 Matlab仿真 0 专栏介绍 🔥附C/Python/Matlab全套代码🔥课程设计、毕业设计、创新竞赛必备!详细介绍全局规划(图搜索、采样法…...

【一万字干货】一篇给你讲清楚智慧城市——附送智慧系列开发项目合集
智慧城市的概念 智慧城市(Smart City)起源于传媒领域,是指利用各种信息技术或创新概念,将城市的系统和服务打通、集成,以提升资源运用的效率,优化城市管理和服务,以及改善市民生活质量。 中国…...

关于如何禁用、暂停或退出OneDrive等操作,看这篇文件就够了
想知道如何禁用OneDrive?你可以暂停OneDrive的文件同步,退出应用程序,阻止它在启动时打开,或者永远从你的机器上删除该应用程序。我们将向你展示如何在Windows计算机上完成所有这些操作。 如何在Windows上关闭OneDrive 有多种方法可以防止OneDrive在你的电脑上妨碍你。…...

Vue3-46-Pinia-获取全局状态变量的方式
使用说明 在 Pinia 中,获取状态变量的方式非常的简单 : 就和使用对象一样。 使用思路 : 1、导入Store;2、声明Store对象;3、使用对象。 在逻辑代码中使用 但是 Option Store 和 Setup Store 两种方式定义的全局状态变量…...

数据库——DAY1(Linux上安装MySQL8.0.35(网络仓库安装))
一、环境部署 1、Red Hat Enterprise Linux 9.3 64 位 2、删除之前安装过本地镜像版本的MySQL软件(以前未安装过,请跳过此步骤) [rootlocalhost ~]# dnf remove mysql-server -y [rootlocalhost ~]# rm -rf /var/lib/mysql [rootlocalhost …...

原生微信小程序-两次设置支付密码校验,密码设置二次确认
效果 具体代码 1、wxml <view style"{{themeColor}}"><view classcontainer><view class"password_content"><view wx:if{{type 1}}><view class"title"><view class"main_title">设置支付密码…...

【Python学习】Python学习15-模块
目录 【Python学习】Python学习15-模块 前言创建语法引入模块from…import 语句from…import* 语句搜索路径PYTHONPATH 变量-*- coding: UTF-8 -*-导入模块现在可以调用模块里包含的函数了PYTHONPATH 变量命名空间和作用域dir()函数globals() 和 locals() 函数reload() 函数Py…...

ARCGIS PRO SDK 设置UI控件状态:启用/禁用
举例: 第一步:添加两个 Button 分别命名为Connect、Disconnect 第二步:nfig.daml添加状态和条件:在 DAML 中定义条件。请记住,条件存在于模块标记<modules>之外,下代码定义:Disconnected_…...

案例126:基于微信小程序的民大食堂用餐综合服务平台
文末获取源码 开发语言:Java 框架:SSM JDK版本:JDK1.8 数据库:mysql 5.7 开发软件:eclipse/myeclipse/idea Maven包:Maven3.5.4 小程序框架:uniapp 小程序开发软件:HBuilder X 小程序…...

cephfs 配置 mds stancd replay 操作
目的 1 假设有某个客户创建过千万文件目录,可以导致 ceph-mds 故障 2 backup ceph-mds 拉起时需要从内存中 replay 最后操作,可能需要吧当前目录中所有目> 录结构 重新 reload 至内存 3 这个过程可能需要几小时,可能需要几天 4 为了快速地拉起 ceph-mds 5 可以选择配置一…...

【2023我的编程之旅】系统学习C语言easyx图形库心得体会
目录 引言 C语言基础知识回顾 easyx图形库介绍 如何快速学习easyx图形库 学习笔记积累 学习成果展示 学习拓展 总结 引言 首先说一下我为什么要学习C语言easyx图形库。我接触C语言easyx图形库是在我今年一月份的时候,也是机缘巧合之下偶然在B站上看到了鸣人…...

【linux】软链接创建(linux的快捷方式创建)
软连接的概念 类似于windows系统中的快捷方式。有的文件目录很长或者每次使用都要找很不方便,于是可以用类似windows的快捷方式的软链接在home(初始目录类似于桌面)上创建一些软链接方便使用。 软链接的语法 ln -s 参数1 参数2 参数1&#…...

基于BP神经网络的光伏发电预测
目录 摘要 BP神经网络参数设置及各种函数选择 参数设置 训练函数 传递函数 学习函数 性能函数 显示函数 前向网络创建函数 BP神经网络训练窗口详解 训练窗口例样 训练窗口四部详解 基于BP神经网络的租金预测 代码下载:19-66天气预测光伏发电.rar(代码完整,数据齐全)资源-C…...

RPA财务机器人在厦门市海沧医院财务管理流程优化汇总的应用RPA全球生态 2024-01-05 17:27 发表于河北
目前国内外研究人员对于RPA机器人在财务管理流程优化领域中的应用研究层出不穷,但现有研究成果主要集中在财务业务单一领域,缺乏财务管理整体流程一体化管控的研究。RPA机器人的功能绝非单一的财务业务处理,无论从自身技术发展,或…...

应用在LCD显示器电源插头里的氮化镓(GaN)MTC-65W1C
LCD(Liquid Crystal Display)显示器是利用液晶显示技术来进行图像表现的显示装置,从液晶显示器的结构来看,无论是笔记本电脑还是桌面系统,采用的LCD显示屏都是由不同部分组成的分层结构。LCD显示器按照控制方式不同可分…...