Spring Cloud Stream 消息驱动基础入门与实践总结
Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。
【1】概念介绍
① 什么是Spring Cloud Stream
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。简单来讲,就是屏蔽了底层XXMQ,应用层不用关注底层是RabbitMQ还是Kafka 。类似于Spring Data抽离持久层屏蔽底层各种数据库的概念。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
官网文档:https://spring.io/projects/spring-cloud-stream#overview

② stream如何统一底层差异
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
-
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
-
通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
-
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

③ Spring Cloud Stream标准流程设计
Stream中的消息通信方式遵循了发布-订阅模式。
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

④ 几个API注解
@EnableBinding:指信道channel和exchange绑定在一起。
@StreamListener:监听队列,用于消费者的队列的消息接收
@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序
下面以RabbitMQ为底层MQ来说明如何使用Stream,当然同样要先安装好RabbitMQ。
【2】消息生产者
① pom依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
② yml配置
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit: # 表示定义的名称,用于 binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 输出通道的名称destination: studyExchange #表示要使用的 Exchange 名称定义content-type: application/json # 消息类型binder: defaultRabbit
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: send-8001.com #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址
③ 消息服务类
public interface IMessageProvider {public String send();
}@EnableBinding(Source.class)//定义消息推送管道
@Slf4j
public class IMessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output;//消息发送通道@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());log.info(serial+"***********************");return serial;}
}
编写控制器发送消息:
@RestController
public class IMessageController {@Resourceprivate IMessageProvider provider;@GetMapping("/sendMessage")public String send(){return provider.send();}
}
【3】消息消费者
① pom依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
② yml配置
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit: # 表示定义的名称,用于 binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange #表示要使用的 Exchange 名称定义content-type: application/json # 消息类型binder: defaultRabbitgroup: group1
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: receive-8002.com #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址
③ 消息接收服务
@Component
@Slf4j
@EnableBinding(Sink.class)
public class StreamController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String>message){log.info("消费者1号接收到消息"+message.getPayload()+"\t port:"+serverPort);}}
上面【2】【3】即可实现消息的发送和接收,但是假设有多个消费者,我们还要考虑两个问题:消息的重复消费和消息的持久化。
【4】group分组解决重复消费问题
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。
在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
- 不同组是可以全面消费的(重复消费),
- 同一组内会发生竞争关系,只有其中一个可以消费。
也就是说,两个消费者微服务的group定义为同一个,即可以解决重复消费问题。
【5】group分组解决消息丢失问题
即,在你服务停机重启期间,消息在不断发送,而服务启动后并没有接收到发送的消息。这是由于你没有配置group属性导致的。
解决方案:配置group属性。
spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit: # 表示定义的名称,用于 binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange #表示要使用的 Exchange 名称定义content-type: application/json # 消息类型binder: defaultRabbitgroup: group1 # 这个很重要!!!
相关文章:
Spring Cloud Stream 消息驱动基础入门与实践总结
Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。…...
你好rust
第一次安装rust,记录一下笔记。 几年前就听说过rust,自己一直是个c爱好者,所以比较抵触rust,早年还有什么rust向上突破群。一直比较抵触,直到这几年rust已经渐渐深入到linux内核、云原生可观测以及zend社区当中&#x…...
STM32 printf 重定向到CAN
最近在调试一款电机驱动板 使用的是CAN总线而且板子上只有一个CAN 想移植Easylogger到上面试试easylogger的效果,先实现pritnf的重定向功能来打印输出 只需要添加以下代码即可实现 代码 #include <stdarg.h> uint8_t FDCAN_UserTxBuffer[512]; void FDCAN_p…...
jmeter性能优化之mysql监控sql慢查询语句分析
接上次博客:基础配置 多用户登录并退出jmx文件:百度网盘 提取码:0000 一、练习jmeter脚本检测mysql慢查询 随意找一个脚本(多用户登录并退出),并发数设置300、500后分别查看mysql监控平台 启动后查看,主要查看mysql…...
海南聚广众达电子商务咨询有限公司引领行业变革
在数字化浪潮席卷全球的今天,电商行业正以前所未有的速度发展。海南聚广众达电子商务咨询有限公司,凭借其在抖音电商领域的深厚积累和不断创新,正逐步成为行业的佼佼者。这家以专注、专业、专注为核心理念的公司,不仅为客户提供全…...
Unity API学习之资源的动态加载
资源的动态加载 在实际游戏开发的更新换代中,随着开发的软件不断更新,我们在脚本中需要拖拽赋值的变量会变空,而要想重新拖拽又太花费时间,因此我们就需要用到Resources.Load<文件类型>("文件名")函数来在一开始…...
C++算法——回溯
回溯算法 实现思想 先看一个实例: //暴力枚举的算法 int n 5; for (int a 1; i < n; i) {for (int b 1; b < n; b){for (int c 1; c < n; c){for (int d 1; d < n; d){for (int e 1; e < n; e){//判断 abcde 是否互补相同if (a ! b &&a…...
java的深拷贝和浅拷贝
总结: 深拷贝:无论是基本类型还是引用类型都会创建新的实例。 浅拷贝:对于基本类型就是复制其值,对于引用类型则是复制了指向这些数据类型的内存地址。 浅拷贝(Shallow Copy) 浅拷贝是指在创建新对象时&am…...
AI产品经理,应掌握哪些技术?
美国的麻省理工学院(Massachusetts Institute of Technology)专门负责科技成果转化商用的部门研究表明: 每一块钱的科研投入,需要100块钱与之配套的投资(人、财、物),才能把思想转化为产品&…...
同三维T80004EHL-W-4K30 4K HDMI编码器,支持WEBRTC协议
输入:1路HDMI1路3.5音频,1路HDMI环出1路3.5音频解嵌输出 4K30超高清,支持U盘/移动硬盘/TF卡录制,支持WEBRTC协议,超低延时,支持3个点外网访问 1个主流1个副流输出,可定制选配POE供电模块,WEBR…...
Hi3861 OpenHarmony嵌入式应用入门--点灯
本篇实现对gpio的控制,通过控制输出进行gpio的点灯操作。 硬件 我们来操作IO2,控制绿色的灯。 软件 GPIO API API名称 说明 hi_u32 hi_gpio_deinit(hi_void); GPIO模块初始化 hi_u32 hi_io_set_pull(hi_io_name id, hi_io_pull val); 设置某个IO…...
SaaS案例分享:成功构建销售渠道的实战经验
面对SaaS产品推广的难题,你是否曾感到迷茫,不知如何选择有效的销售渠道?Shopify独立站联盟营销或许能为你提供新的思路。Shopify作为领先的电商解决方案提供商,其独立站功能为众多商家提供了强大的在线销售平台。而联盟营销&#…...
密钥管理简介
首先我们要知道什么是密钥管理? 密钥管理是一种涉及生成、存储、使用和更新密钥的过程。 密钥的种类 我们知道,对称密码主要包括分组密码和序列密码。但有时也可以将杂凑函数和消息认证码划分为这一类,将它们的密钥称为对称密钥;…...
2024中国应急(消防)品牌巡展成都站成功召开!
汇聚品牌力量,共同相聚成都。6月14日,由中国安全产业协会指导,中国安全产业协会应急创新分会、应急救援产业网联合主办,四川省消防协会协办的“一切为了安全”2024年中国应急(消防)品牌巡展-成都站成功举办。该巡展旨在展示中国应…...
ansible-Role角色批量按照node_export节点,并追加信息到Prometheus文件中
文章目录 剧本功能 inventory.yaml文件定义deploy.yaml角色定义node_exporter_lock角色定义任务角色main.yamlnode_exporter_tasks.yml角色触发任务notifyextra_tasks.yml角色prometheus_node_config.j2模板文件 执行命令查看变量 剧本功能 功能1: 批量执行node_ex…...
求最小公倍数 、小球走过路程计算 题目
题目 JAVA11 求最小公倍数分析:代码:大佬代码: JAVA12 小球走过路程计算分析:代码: JAVA11 求最小公倍数 描述 编写一个方法,该方法的返回值是两个不大于100的正整数的最小公倍数。 输入描述:…...
【Android面试八股文】你能说一说为什么IO是耗时操作?
IO(输入/输出)操作之所以是耗时操作,主要是由于以下几个原因: 1. 物理设备的限制 机械动作:传统的硬盘驱动器(HDD)包含旋转的磁盘和移动的磁头,以读取或写入数据。这些机械动作需要时间完成。虽然固态硬盘(SSD)没有机械部件,但它们仍然受到电子信号传输速度的限制。…...
怎样增强 CLike 游戏的社交功能,促进玩家之间的互动和交流?
要增强CLike游戏的社交功能,以促进玩家之间的互动和交流,可以考虑以下几个方面: 添加聊天功能:在游戏中加入实时聊天功能,让玩家可以在游戏内互相交流。可以通过文本聊天或者语音聊天来实现。 社交平台集成࿱…...
12_YouOnlyLookOnce(YOLOv3)新一代实时目标检测技术
1.1 回顾V1和V2 V1:05_YouOnlyLookOnce(YOLOV1)目标检测领域的革命性突破-CSDN博客 V2:07_YouOnlyLookOnce(YOLOv2)Better,Faster,Stronger-CSDN博客 1.2 简介 YOLOv3(You Only Look Once version 3)是…...
安装 Nuxt.js 的步骤和注意事项
title: 安装 Nuxt.js 的步骤和注意事项 date: 2024/6/17 updated: 2024/6/17 author: cmdragon excerpt: Nuxt.js在Vue.js基础上提供的服务器端渲染框架优势,包括提高开发效率、代码维护性和应用性能。指南详细说明了从环境准备、Nuxt.js安装配置到进阶部署技巧&…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
【第二十一章 SDIO接口(SDIO)】
第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...
Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...
VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP
编辑-虚拟网络编辑器-更改设置 选择桥接模式,然后找到相应的网卡(可以查看自己本机的网络连接) windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置,选择刚才配置的桥接模式 静态ip设置: 我用的ubuntu24桌…...
CVE-2020-17519源码分析与漏洞复现(Flink 任意文件读取)
漏洞概览 漏洞名称:Apache Flink REST API 任意文件读取漏洞CVE编号:CVE-2020-17519CVSS评分:7.5影响版本:Apache Flink 1.11.0、1.11.1、1.11.2修复版本:≥ 1.11.3 或 ≥ 1.12.0漏洞类型:路径遍历&#x…...
【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案
在移动互联网营销竞争白热化的当下,推客小程序系统凭借其裂变传播、精准营销等特性,成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径,助力开发者打造具有市场竞争力的营销工具。 一、系统核心功能架构&…...
热烈祝贺埃文科技正式加入可信数据空间发展联盟
2025年4月29日,在福州举办的第八届数字中国建设峰会“可信数据空间分论坛”上,可信数据空间发展联盟正式宣告成立。国家数据局党组书记、局长刘烈宏出席并致辞,强调该联盟是推进全国一体化数据市场建设的关键抓手。 郑州埃文科技有限公司&am…...
