当前位置: 首页 > news >正文

Spring Cloud Stream 消息驱动基础入门与实践总结

Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。

【1】概念介绍

① 什么是Spring Cloud Stream

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。简单来讲,就是屏蔽了底层XXMQ,应用层不用关注底层是RabbitMQ还是Kafka 。类似于Spring Data抽离持久层屏蔽底层各种数据库的概念。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

官网文档:https://spring.io/projects/spring-cloud-stream#overview

在这里插入图片描述

② stream如何统一底层差异

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

  • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。

  • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

在这里插入图片描述

③ Spring Cloud Stream标准流程设计

Stream中的消息通信方式遵循了发布-订阅模式。

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

在这里插入图片描述

④ 几个API注解

@EnableBinding:指信道channel和exchange绑定在一起。

@StreamListener:监听队列,用于消费者的队列的消息接收

@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。

@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序

下面以RabbitMQ为底层MQ来说明如何使用Stream,当然同样要先安装好RabbitMQ。

【2】消息生产者

① pom依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

② yml配置

server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit:  # 表示定义的名称,用于 binding整合type: rabbit  # 消息组件类型environment:  # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 输出通道的名称destination: studyExchange  #表示要使用的 Exchange 名称定义content-type: application/json  # 消息类型binder: defaultRabbit
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: send-8001.com  #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址

③ 消息服务类

public interface IMessageProvider {public String send();
}@EnableBinding(Source.class)//定义消息推送管道
@Slf4j
public class IMessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output;//消息发送通道@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());log.info(serial+"***********************");return serial;}
}

编写控制器发送消息:

@RestController
public class IMessageController {@Resourceprivate IMessageProvider provider;@GetMapping("/sendMessage")public String send(){return provider.send();}
}

【3】消息消费者

① pom依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

② yml配置

server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit:  # 表示定义的名称,用于 binding整合type: rabbit  # 消息组件类型environment:  # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange  #表示要使用的 Exchange 名称定义content-type: application/json  # 消息类型binder: defaultRabbitgroup: group1
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: receive-8002.com  #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址

③ 消息接收服务

@Component
@Slf4j
@EnableBinding(Sink.class)
public class StreamController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String>message){log.info("消费者1号接收到消息"+message.getPayload()+"\t port:"+serverPort);}}

上面【2】【3】即可实现消息的发送和接收,但是假设有多个消费者,我们还要考虑两个问题:消息的重复消费和消息的持久化。

【4】group分组解决重复消费问题

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。

在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

  • 不同组是可以全面消费的(重复消费),
  • 同一组内会发生竞争关系,只有其中一个可以消费。

也就是说,两个消费者微服务的group定义为同一个,即可以解决重复消费问题

【5】group分组解决消息丢失问题

即,在你服务停机重启期间,消息在不断发送,而服务启动后并没有接收到发送的消息。这是由于你没有配置group属性导致的。

解决方案:配置group属性

spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit:  # 表示定义的名称,用于 binding整合type: rabbit  # 消息组件类型environment:  # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange  #表示要使用的 Exchange 名称定义content-type: application/json  # 消息类型binder: defaultRabbitgroup: group1 # 这个很重要!!!

相关文章:

Spring Cloud Stream 消息驱动基础入门与实践总结

Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架&#xff0c;该框架提供了一个灵活的编程模型&#xff0c;它建立在已经建立和熟悉的Spring熟语和最佳实践上&#xff0c;包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。…...

你好rust

第一次安装rust&#xff0c;记录一下笔记。 几年前就听说过rust&#xff0c;自己一直是个c爱好者&#xff0c;所以比较抵触rust&#xff0c;早年还有什么rust向上突破群。一直比较抵触&#xff0c;直到这几年rust已经渐渐深入到linux内核、云原生可观测以及zend社区当中&#x…...

STM32 printf 重定向到CAN

最近在调试一款电机驱动板 使用的是CAN总线而且板子上只有一个CAN 想移植Easylogger到上面试试easylogger的效果&#xff0c;先实现pritnf的重定向功能来打印输出 只需要添加以下代码即可实现 代码 #include <stdarg.h> uint8_t FDCAN_UserTxBuffer[512]; void FDCAN_p…...

jmeter性能优化之mysql监控sql慢查询语句分析

接上次博客&#xff1a;基础配置 多用户登录并退出jmx文件&#xff1a;百度网盘 提取码&#xff1a;0000 一、练习jmeter脚本检测mysql慢查询 随意找一个脚本(多用户登录并退出)&#xff0c;并发数设置300、500后分别查看mysql监控平台 启动后查看&#xff0c;主要查看mysql…...

海南聚广众达电子商务咨询有限公司引领行业变革

在数字化浪潮席卷全球的今天&#xff0c;电商行业正以前所未有的速度发展。海南聚广众达电子商务咨询有限公司&#xff0c;凭借其在抖音电商领域的深厚积累和不断创新&#xff0c;正逐步成为行业的佼佼者。这家以专注、专业、专注为核心理念的公司&#xff0c;不仅为客户提供全…...

Unity API学习之资源的动态加载

资源的动态加载 在实际游戏开发的更新换代中&#xff0c;随着开发的软件不断更新&#xff0c;我们在脚本中需要拖拽赋值的变量会变空&#xff0c;而要想重新拖拽又太花费时间&#xff0c;因此我们就需要用到Resources.Load<文件类型>("文件名")函数来在一开始…...

C++算法——回溯

回溯算法 实现思想 先看一个实例&#xff1a; //暴力枚举的算法 int n 5; for (int a 1; i < n; i) {for (int b 1; b < n; b){for (int c 1; c < n; c){for (int d 1; d < n; d){for (int e 1; e < n; e){//判断 abcde 是否互补相同if (a ! b &&a…...

java的深拷贝和浅拷贝

总结&#xff1a; 深拷贝&#xff1a;无论是基本类型还是引用类型都会创建新的实例。 浅拷贝&#xff1a;对于基本类型就是复制其值&#xff0c;对于引用类型则是复制了指向这些数据类型的内存地址。 浅拷贝&#xff08;Shallow Copy&#xff09; 浅拷贝是指在创建新对象时&am…...

AI产品经理,应掌握哪些技术?

美国的麻省理工学院&#xff08;Massachusetts Institute of Technology&#xff09;专门负责科技成果转化商用的部门研究表明&#xff1a; 每一块钱的科研投入&#xff0c;需要100块钱与之配套的投资&#xff08;人、财、物&#xff09;&#xff0c;才能把思想转化为产品&…...

同三维T80004EHL-W-4K30 4K HDMI编码器,支持WEBRTC协议

输入&#xff1a;1路HDMI1路3.5音频&#xff0c;1路HDMI环出1路3.5音频解嵌输出 4K30超高清,支持U盘/移动硬盘/TF卡录制&#xff0c;支持WEBRTC协议&#xff0c;超低延时&#xff0c;支持3个点外网访问 1个主流1个副流输出&#xff0c;可定制选配POE供电模块&#xff0c;WEBR…...

Hi3861 OpenHarmony嵌入式应用入门--点灯

本篇实现对gpio的控制&#xff0c;通过控制输出进行gpio的点灯操作。 硬件 我们来操作IO2&#xff0c;控制绿色的灯。 软件 GPIO API API名称 说明 hi_u32 hi_gpio_deinit(hi_void); GPIO模块初始化 hi_u32 hi_io_set_pull(hi_io_name id, hi_io_pull val); 设置某个IO…...

SaaS案例分享:成功构建销售渠道的实战经验

面对SaaS产品推广的难题&#xff0c;你是否曾感到迷茫&#xff0c;不知如何选择有效的销售渠道&#xff1f;Shopify独立站联盟营销或许能为你提供新的思路。Shopify作为领先的电商解决方案提供商&#xff0c;其独立站功能为众多商家提供了强大的在线销售平台。而联盟营销&#…...

密钥管理简介

首先我们要知道什么是密钥管理&#xff1f; 密钥管理是一种涉及生成、存储、使用和更新密钥的过程。 密钥的种类 我们知道&#xff0c;对称密码主要包括分组密码和序列密码。但有时也可以将杂凑函数和消息认证码划分为这一类&#xff0c;将它们的密钥称为对称密钥&#xff1b;…...

2024中国应急(消防)品牌巡展成都站成功召开!

汇聚品牌力量&#xff0c;共同相聚成都。6月14日&#xff0c;由中国安全产业协会指导&#xff0c;中国安全产业协会应急创新分会、应急救援产业网联合主办&#xff0c;四川省消防协会协办的“一切为了安全”2024年中国应急(消防)品牌巡展-成都站成功举办。该巡展旨在展示中国应…...

ansible-Role角色批量按照node_export节点,并追加信息到Prometheus文件中

文章目录 剧本功能 inventory.yaml文件定义deploy.yaml角色定义node_exporter_lock角色定义任务角色main.yamlnode_exporter_tasks.yml角色触发任务notifyextra_tasks.yml角色prometheus_node_config.j2模板文件 执行命令查看变量 剧本功能 功能1&#xff1a; 批量执行node_ex…...

求最小公倍数 、小球走过路程计算 题目

题目 JAVA11 求最小公倍数分析&#xff1a;代码&#xff1a;大佬代码&#xff1a; JAVA12 小球走过路程计算分析&#xff1a;代码&#xff1a; JAVA11 求最小公倍数 描述 编写一个方法&#xff0c;该方法的返回值是两个不大于100的正整数的最小公倍数。 输入描述&#xff1a;…...

【Android面试八股文】你能说一说为什么IO是耗时操作?

IO(输入/输出)操作之所以是耗时操作,主要是由于以下几个原因: 1. 物理设备的限制 机械动作:传统的硬盘驱动器(HDD)包含旋转的磁盘和移动的磁头,以读取或写入数据。这些机械动作需要时间完成。虽然固态硬盘(SSD)没有机械部件,但它们仍然受到电子信号传输速度的限制。…...

怎样增强 CLike 游戏的社交功能,促进玩家之间的互动和交流?

要增强CLike游戏的社交功能&#xff0c;以促进玩家之间的互动和交流&#xff0c;可以考虑以下几个方面&#xff1a; 添加聊天功能&#xff1a;在游戏中加入实时聊天功能&#xff0c;让玩家可以在游戏内互相交流。可以通过文本聊天或者语音聊天来实现。 社交平台集成&#xff1…...

12_YouOnlyLookOnce(YOLOv3)新一代实时目标检测技术

1.1 回顾V1和V2 V1&#xff1a;05_YouOnlyLookOnce(YOLOV1)目标检测领域的革命性突破-CSDN博客 V2&#xff1a;07_YouOnlyLookOnce(YOLOv2)Better&#xff0c;Faster&#xff0c;Stronger-CSDN博客 1.2 简介 YOLOv3&#xff08;You Only Look Once version 3&#xff09;是…...

安装 Nuxt.js 的步骤和注意事项

title: 安装 Nuxt.js 的步骤和注意事项 date: 2024/6/17 updated: 2024/6/17 author: cmdragon excerpt: Nuxt.js在Vue.js基础上提供的服务器端渲染框架优势&#xff0c;包括提高开发效率、代码维护性和应用性能。指南详细说明了从环境准备、Nuxt.js安装配置到进阶部署技巧&…...

【perl】环境搭建

1、Vscode Strawberry Perl 此过程与tcl环境搭建很类似&#xff0c;请参考我的这篇文章&#xff1a; 【vscode】 与 【tclsh】 联合搭建tcl开发环境_tclsh软件-CSDN博客 perl语言的解释器可以选择&#xff0c;strawberry perl。Strawberry Perl for Windows - Releases。 …...

【车载音视频AI电脑】全国产海事船载视频监控系统解决方案

海事船载视频监控系统解决方案针对我国快速发展的内河航运、沿海航运和远洋航运中存在的航行安全和航运监管难题&#xff0c;为船舶运营方、政府监管部门提供一套集视频采集、存储、回放调阅为一体的视频监控系统&#xff0c;对中大型船舶运行中的内部重要部位情况和外部环境进…...

Centos SFTP搭建

SFTP配置、连接及挂载教程_sftp连接-CSDN博客1、确认是否安装yum list installed | grep openssh-server 2、创建用户和组 sudo groupadd tksftpgroup sudo useradd -g tksftpgroup -d /home/www/tk_data -s /sbin/nologin tksftp01 sudo passwd tksftp013. 配置SFTP注意&a…...

【中学教资科目二】01教育基础

01教育基础 前言第一节 教育的产生与发展1.1 教育的起源 第二节 教育学的产生和发展2.1 中国教育学的发展2.2 西方教育学的发展2.3 独立及多样化阶段2.4 马克思教育学2.5 现代教育发展 第三节 教育与社会的发展3.1 教育与文化的关系 第四节 教育与人的发展、4.1 个体身心发展的…...

设计模式-享元模式Flyweight(结构型)

享元模式(Flyweight) 享元模式是一种结构型模式&#xff0c;它主要用于减少创建对象的数量&#xff0c;减少内存占用。通过重用现有对象的方式&#xff0c;如果未找到匹配对象则新建对象。线程池、数据库连接池、常量池等池化的思想就是享元模式的一种应用。 图解 角色 享元工…...

【刷题】LeetCode刷题汇总

目录 一、刷题题号1&#xff1a;两数之和 二、解法总结1. 嵌套循环2. 双指针 一、刷题 记录LeetCode力扣刷题 题号1&#xff1a;两数之和 双循环&#xff08;暴力解法&#xff09;&#xff1a; class Solution {public int[] twoSum(int[] nums, int target) {int[] listne…...

树莓派pico入坑笔记,快捷键键盘制作

使用usb_hid功能制作快捷键小键盘&#xff0c;定义了6个键&#xff0c;分别是 ctrlz ctrlv ctrlc ctrla ctrlw ctrln 对应引脚 board.GP4, board.GP8, board.GP13 board.GP28, board.GP20, board.GP17 需要用到的库&#xff0c;记得复制进单片机存储里面 然后是main主程…...

华为鲲鹏应用开发基础:鲲鹏处理器及关键硬件特性介绍(二)

1. 鲲鹏简介 1.1 鲲鹏处理器简介 鲲鹏处理器是华为自研的基于ARMv8指令集开发的数据中心级处理器 1.2 基于鲲鹏主板的多样化计算产品 1.3 基于鲲鹏920的华为TaiShan(泰山) 200服务器 1.3.1 TaiShan 2280服务器内部视图 1.3.2 TaiShan 2280服务器物理结构 1.3.3 TaiShan 2280服…...

Vue.js结合ASP.NET Core构建用户登录与权限验证系统

1. 环境准备2. 创建项目3. Vue配置步骤一: 安装包步骤二: 配置文件步骤三: 页面文件 4. 后台配置 在本教程中&#xff0c;我将利用Visual Studio 2022的强大集成开发环境&#xff0c;结合Vue.js前端框架和ASP.NET Core后端框架&#xff0c;从头开始创建一个具备用户登录与权限验…...

【html】如何利用id选择器实现主题切换

今天给大家介绍一种方法来实现主题切换的效果 效果图&#xff1a; 源码&#xff1a; <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initia…...

不停弹窗的网站怎么做/什么叫友情链接

雷音是阿里巴巴研究员、淘系技术部 FashionAI 负责人&#xff0c;在淘系技术嘉年华硅谷站&#xff0c;他分享了《时尚电商新赛道— FashionAI 中的技术》 &#xff0c;旨在揭秘&#xff1a;从面向机器学习的知识重建切入&#xff0c;提出了在 AI 能力的推动下&#xff0c;让人值…...

工信部网站域名备案查询/网络优化是干什么的

首先强调一下struts2的线程程安全&#xff0c;在Struts2中大量采用ThreadLocal线程局部变量的方法来保证线程的安全,像Dispatcher等都是通过ThreadLocal来保存变量值,使得每个线程都有自己独立的实例变量,互不相干. 接下来就从Dispatcher开始看起&#xff0c;先看其构造函数&am…...

网站数据维护/seo怎么做优化

本来准备上微软的SCOM 演示下system center2016的&#xff0c;顺便下一个windows 2016 RS15G的文件&#xff0c;2个小时过去了&#xff0c;还有2小时看样子绝对要过夜啊。然后搜了下其他开源的&#xff0c;发现了这货。Overview然后找了老牌监控软件MRTG, 全程Multi Router Tra…...

最大的网站建设/免费注册网页网址

目录 1、Collections工具类 2、Stack子类 1、Collections工具类 Collections是专为集合服务的工具类&#xff0c;可以进行List、Set、Map等集合的操作&#xff0c;比较有用 的方法如下&#xff1a; 1&#xff09;批量添加 public static <T> boolean addAll(Recently…...

建筑设计地图网站/企业宣传软文范例

文章目录封装&#xff08;Encapsulation&#xff09;继承&#xff08;inheritance&#xff09;多态&#xff08;polymorphism&#xff09;深入理解多态和继承多重继承的弊端封装&#xff08;Encapsulation&#xff09; 一种将抽象性函式接口的实现细节部分包装、隐藏起来的方法…...

阿克苏网站建设优化/品牌策划与推广方案

LayUI tab选项卡 分页展示 实现 Tab选项卡切换显示对应数据 要求&#xff1a;实现tab选项卡改变的同时展示数据也跟着改变 实现条件&#xff1a; 1、选项卡【官网 – 文档/示例 – 页面元素 – 选项卡】 2、数据表格【官网 – 文档/示例 – 内置模块 – 数据表格】 3、分页【官…...