【实战】Spring Cloud Stream 3.1+整合Kafka
文章目录
- 前言
- 新版版本优势
- 实战演示
- 增加maven依赖
- 增加applicaiton.yaml配置
- 新增Kafka通道消费者
- 新增发送消息的接口
- 实战测试
- postman发送一个正常的消息
- postman发送异常消息
前言
之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。

新版版本优势
新版提倡用函数式进行发送和消费信息
定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
input - + -in- +
output - + -out- +
在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。
比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:
spring:kafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka:binder:brokers: ${spring.kafka.bootstrap-servers}binders:kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub function:definition: inputChannel,outputChannelbindings:inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainoutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目
此时消息生产者为:
@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}
此时消息消费者为:
@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息Payloa:" + message.getPayload());System.out.println("接收到消息Header:" + message.getHeaders());};}
}
实战演示
我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
主要演示功能:
正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费
本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。
增加maven依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cce-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>seata-demo-order</name>
<description>Demo project for Spring Boot</description>
<properties><java.version>8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.4</version></dependency>
</dependencies>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
增加applicaiton.yaml配置
spring:#kafkakafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka: # kafka配置binder:brokers: ${spring.kafka.bootstrap-servers}auto-add-partitions: true #自动分区auto-create-topics: true #自动创建主题replication-factor: 3 #副本min-partition-count: 3 #最小分区bindings:outputChannel-out-0:producer:# 无限制重发不产生消息丢失retries: Integer.MAX_VALUE#acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低#acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中#acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长#可以设置的值为:all, -1, 0, 1acks: allmin:insync:replicas: 3 #感知副本数inputChannel-in-0:consumer:concurrency: 1 #消费者数量max-concurrency: 5 #最大消费者数量recovery-interval: 3000 #3s 重连auto-rebalance-enabled: true #主题分区消费者组成员自动平衡auto-commit-offset: false #手动提交偏移量enable-dlq: true # 开启 dlq队列dlq-name: test-kafka-topic.dlqdeserializationExceptionHandler: sendToDlq #异常加入死信binders: # 与外部mq组件绑定kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub #默认绑定function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out)definition: inputChannel;outputChannel;dlqChannelbindings: # 通道绑定inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10soutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目dlqChannel-in-0:binder: kafkahubdestination: test-kafka-topic.dlqgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10sdlqChannel-out-0:binder: kafkahubdestination: test-kafka-topic.dlqcontent-type: text/plainproducer:partition-count: 3 #分区数目
新增Kafka通道消费者
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.function.Consumer;/*** KafkaCustomer* @author senfel* @version 1.0* @date 2024/6/18 15:22*/
@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息:" + message.getPayload());System.out.println("接收到消息:" + message.getHeaders());if(message.getPayload().contains("9")){boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build());System.err.println("向dlqChannel发送消息:"+send);}};}/*** dlqChannel 死信消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> dlqChannel(){return message -> {System.out.println("死信dlqChannel接收到消息:" + message.getPayload());System.out.println("死信dlqChannel接收到消息:" + message.getHeaders());};}
}
新增发送消息的接口
@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}
实战测试
postman发送一个正常的消息

postman发送异常消息

相关文章:
【实战】Spring Cloud Stream 3.1+整合Kafka
文章目录 前言新版版本优势实战演示增加maven依赖增加applicaiton.yaml配置新增Kafka通道消费者新增发送消息的接口 实战测试postman发送一个正常的消息postman发送异常消息 前言 之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太…...
java之可变字符串之append方法
可变字符串如果要添加内容,需要用到append方法 语法格式如下 sbf.append(obj) 其中sbf是任意的可变字符串 obj是任意数据类型的对象 这个方法是将任意数据转换成字符串,然后添加到此序列中 public class Buffer {public static void main(String[]…...
[保姆级教程]uniapp自定义导航栏
文章目录 导文隐藏默认导航栏:全局隐藏当前页面隐藏 添加自定义导航栏视图:手写导航栏组件导航栏 导文 在 UniApp 中,自定义导航栏通常涉及到隐藏默认的导航栏,并在页面顶部添加自定义的视图组件来模拟导航栏的功能。 隐藏默认导航…...
项目训练营第二天
项目训练营第二天 用户登录逻辑 1、账户名不少于4位 2、密码不少于8位 3、数据库表中能够查询到账户、密码 4、密码查询时用同样加密脱敏处理手段处理后再和数据库中取出字段进行对比,如果账户名未查询到,直接返回null 5、后端设置相应的脱敏后用户的s…...
考研数学一有多难?130+背后的残酷真相
考研数学一很难 大家平时在网上上看到很多人说自己考了130,其实这些人只占参加考研数学人数的极少部分,有个数据可以展示出来考研数学到底有多难: 在几百万考研大军中,能考到120分以上的考生只有2%。绝大多数人的分数集中在30到…...
vue2脚手架笔记总结1
1.什么是组件 组件是实现局部代码和功能资源的集合 2.vue.config.js配置文件 使用vue inspect > output.js可以查看到Vue脚手架的默认配置,但是在这里面修改不会影响实际的配置,如果需要修改配置需要使用用vue.config.js文件,详情见:https://cli.vuej…...
校园巡礼:一周只上四天课,入学即发钱?深圳理工大学,开局即王炸
校园巡礼 | 一周只上四天课,入学即发钱?深圳理工大学,开局即王炸! 会议之眼 快讯 目前各省的高考成绩现已陆续揭晓,广东省教育考试院发布了2024年高考录取最低分数线,物理类本科线为442分,历史…...
免交互 实验
免交互 交互:我们发出指令控制程序的运行,程序在接收到指令之后按照指令的效果做出对应的反应。 免交互:间接的,通过第三方的方式把指令传送给程序,不用直接下达指令。 Here document 免交互 这是命令行格式&#…...
Sublime Text 设置
备份 {"font_size": 10,"index_files": true,"font_face": "Courier New","vintage_start_in_command_mode": false,"ignored_packages": ["Vintage"],"word_wrap": "false" }关闭…...
spire.Pdf 将pdf转成image
一、nuget安装 <ItemGroup><PackageReference Include"Spire.PDF" Version"10.6.7" /></ItemGroup> 二、直接上代码 using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; using System; using System.IO;namespace …...
仓颉编程语言 -- 初识(一)
官网 文档 原生智能化 内嵌AgentDSL的编程框架,自然语言&编程语言有机融合;多Agent协同,简化符号表达,模式自由组合,支持各类智能应用开发。 天生全场景 轻量化可缩放运行时,模块化分层设计…...
前端JS必用工具【js-tool-big-box】学习,数值型数组的正向排序和倒向排序
这一小节,我们说一下前端 js-tool-big-box 这个工具库,添加的数值型数组的正向排序和倒向排序。 以前呢,我们的数组需要排序的时候,都是在项目的utils目录里,写一段公共方法,弄个冒泡排序啦,弄…...
python web框架哪家强?Flask、Django、FastAPI对比
前言 当你掌握了python的基础知识,并且会用和HTML和CSS编写简单的静态网页。现在你只需再掌握一个python web框架的知识,就可以开始编写一个动态的网站了。目前市面比较流程的python web框架有三个flask、Django、FastAPI。接下来我们对比一下。他们三个…...
Mybatis plus:IService接口
一、介绍 在MybatisPlus框架中,IService接口扮演着重要的角色。作为一个通用的服务接口,IService定义了一系列方法,包括查询、插入、更新、删除等。这些方法的定义使得在服务层进行数据库操作变得更为便捷和高效。 IService 接口是一个泛型接…...
时序分析基本概念介绍——min pulse width 最小脉冲宽度
文章目录 前言一、什么是 min pulse width?二、为什么检查 min pulse width?三、如何设置 min pulse width约束?1. 在sdc里面定义2. library里面定义 四、如何检查 min pulse width?五、如何修复 min pulse width?总结…...
PHP原生代码生成pdf---解决中文乱码问题
github地址 尝试了使用composer下载FPDF或者FPDI,但是无法解决中文乱码问题。只有使用这个github上的中文包才可以,那俩没必要下。 直接上代码(这里并没有使用任何框架) require(./fpdf/chinese.php);//生成pdf$pdf new PDF_Chinese();$pdf->AddPage…...
智慧车库管理系统
摘 要 随着城市化进程的不断加快,私家车数量的快速增长给城市交通带来了巨大的挑战,停车问题成为城市交通管理中的一大难题。车辆停车时,在停车场寻找停车位耗时过久,不仅仅浪费用户的时间,还可能引起交通拥堵。城市停…...
每日新闻掌握【2024年6月26日 星期三】
2024年6月26日 星期三 农历五月廿一 大公司/大事件 OpenAI将终止对中国提供API服务 从6月24日晚间开始,已有多名用户收到了来自OpenAI的邮件。该邮件表示,“我们的数据显示您的组织来自OpenAI目前不支持的地区的API流量。”邮件进一步表示,…...
InVEST实践及在生态系统服务供需、固碳、城市热岛、论文写作等实际项目中应用
白老师(研究员):长期从事生态系统结构-格局-过程-功能-服务的变化与响应关系等研究工作,重点围绕生物多样性、生态系统服务与价值等,构建生物地球化学模型和评价指标体系,为城市、区域和自然保护区的可持续发展和生态环…...
慧科新闻搜索研究数据库的使用指南及个人获取途径
《慧科新闻搜索研究数据库》WiseSearch由慧科讯业有限公司出品。WiseSearch是具有新闻搜索/浏览、对比分析等功能的一站式新闻搜索平台;内容包括1200种报刊和8000 网站的新闻资讯,平面媒体涵盖全国综合大报、党委机关报、都市报、行业报刊媒体࿰…...
基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
ServerTrust 并非唯一
NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...
Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成
一个面向 Java 开发者的 Sring-Ai 示例工程项目,该项目是一个 Spring AI 快速入门的样例工程项目,旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计,每个模块都专注于特定的功能领域,便于学习和…...
深入浅出Diffusion模型:从原理到实践的全方位教程
I. 引言:生成式AI的黎明 – Diffusion模型是什么? 近年来,生成式人工智能(Generative AI)领域取得了爆炸性的进展,模型能够根据简单的文本提示创作出逼真的图像、连贯的文本,乃至更多令人惊叹的…...
nnUNet V2修改网络——暴力替换网络为UNet++
更换前,要用nnUNet V2跑通所用数据集,证明nnUNet V2、数据集、运行环境等没有问题 阅读nnU-Net V2 的 U-Net结构,初步了解要修改的网络,知己知彼,修改起来才能游刃有余。 U-Net存在两个局限,一是网络的最佳深度因应用场景而异,这取决于任务的难度和可用于训练的标注数…...
SQL Server 触发器调用存储过程实现发送 HTTP 请求
文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...
