SpringCloud(五)MQ消息队列
MQ
- 概念
- 常见消息模型
- helloworld案例
- 实现
- 实现spring AMQP发送消息
- 实现spring AMQP接收消息
- 工作消息队列
- 实现
- 发布订阅模型
- Fanout Exchange
- 实现
- DirectExchange
- 实现
- TopicExchange
- 实现
- DirectExchange 和FanoutExchange的差异
- DirectExchange 和TopicExchange的差异
- 基于@RabbitListener注解声明队列有 哪些常用注
- 消息转换器
- 注意
- 同步调用
- 异步调用
- 安装
- SpringAMQP
- 特征
概念
MQ(MessageQueue):消息队列,事件驱动架构中的Broker
- channel:操作MQ的工具
- exchange:路由消息到队列
- queue:缓存消息
- virtual host: 虚拟主机,是对queue、exchange等资源逻辑分组
常见消息模型
helloworld案例
角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
实现
实现spring AMQP发送消息
- 在父工程引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
-
在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
- 在publisher服务中编写application.yml,添加mq连接信息
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor
- 在publisher服务中新建一个测试类,编写测试方法
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue(){String queueName = "simple.queue";String message = "hello,spring amqp!";rabbitTemplate.convertAndSend(queueName,message);} }
实现spring AMQP接收消息
- 在父工程引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
-
在consumer服务中编写消费逻辑,监听simple.queue。
- 在consumer服务中编写application.yml,添加mq连接信息
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor
- 在publisher服务中新建一个测试类,编写测试方法
@Component public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");} }
工作消息队列
作用: 提高消息处理速度,避免队列消息堆积。
实现
- 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testWorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello,spring amqp!";for (int i = 0; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}
- 在consumer服务中定义两个消息监听者,都监听simple.queue队列
@RabbitListener(queues = "simple.queue")public void listenWorkQueueMessage1 (String msg) throws InterruptedException{System.out.println("spring 消费者1接收到消息:【"+msg+"】");Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueueMessage2 (String msg) throws InterruptedException{System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色Thread.sleep(200);
- 消费者1每秒处理50条消息,消费者2每秒处理10条消息
- 修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toorlistener:direct:prefetch: 1
发布订阅模型
概念: 与之前模型区别是,允许将同一消息发送给多个消费者。
实现方式: exchange(交换机)
exchange: 负责消息路由,不存储,路由失败则消息丢失
常见exchange类型:
- Fanout:广播
- Direct:路由
- Topic:话题
Fanout Exchange
Fanout Exchange:将接收到的消息路由到每一个跟其绑定的queue
实现
-
在consumer服务中,利用代码声明队列(Queue)、交换机(Exchange),并将两者绑定(Binding)
- SringAMQP提供了声明交换机、队列、绑定关系的API。
- 在consumer服务常见一个类,添加@configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding。
@Configuration public class FanoutConfig {//声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("root.fanout");}//声明第一个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);} //第二个同第一个 }
- SringAMQP提供了声明交换机、队列、绑定关系的API。
-
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.fanout发送消息
@Testpublic void testSendFanoutExchange(){String exchangeName = "root.fanout";String message = "hello,spring amqp!";rabbitTemplate.convertAndSend(exchangeName,message);}
DirectExchange
DirectExchange: 将接收到的消息更具规则路由到指定的Queue,因此称为路由模式(routes)
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
实现
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中编写两个消费者方法,分别监听direct.queue1和direct.queue2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name="direct.queue1"),exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到direct.queue1的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name="direct.queue2"),exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到direct.queue2的消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.direct发送消息
@Testpublic void testSendDirectExchange(){String exchangeName = "root.direct";String message = "hello,red!";rabbitTemplate.convertAndSend(exchangeName,"red",message);}
TopicExchange
TopicExchange: 与DirectExchange类似,区别在于routineKey必须是多个单词的列表,并且以== . ==分割。
Queue与Exchange指定BindingKey时可以使用通配符
# :代指0个或多个单词
*:代指一个单词
实现
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中编写两个消费者方法,分别监听topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name="topic.queue1"),exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到topic.queue1的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name="topic.queue2"),exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到topic.queue2的消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.topic发送消息
@Testpublic void testSendTopicExchange(){String exchangeName = "root.topic";String message = "hello,china.news!";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}
DirectExchange 和FanoutExchange的差异
- FanoutExchange将消息路由给每一个与之绑定的队列
- DirectExchange根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
DirectExchange 和TopicExchange的差异
- TopicExchange的routineKey必须使用多个单词,以== . ==分割
- TopicExchange可以使用通配符
基于@RabbitListener注解声明队列有 哪些常用注
- @Queue
- @Exchange
消息转换器
设置JSON方式序列化:
发送消息
- 在publisher服务引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
- 在publisher服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
接收消息
- 在consumer服务引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
- 在consumer服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
- 定义消费者
@RabbitListener(queues = "object.queue")public void listenObjectQueueMessage(Map<String,Object> msg){System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色}
注意
MessageConverter默认是JDK序列化
接收方和发送方必须使用相同的MessageConverter
同步调用
优点: 时效性高
问题:
- 耦合度高
- 性能下降
- 资源浪费
- 级联失败
异步调用
实现方式:
- 事件驱动(常用)
优势:
- 服务解耦
- 性能提升,吞吐量提高
- 故障隔离。没有强依赖,不担心级联失败问题
- 流量削锋
缺点:
- 依赖Broker的可靠性、安全性、吞吐能力
- 架构复杂、业务没有明显的流程线,不好追踪管理
安装
docker pull rabbitmq:3-managementdocker run \-e RABBITMQ_DEFAULT_USER=root \-e RABBITMQ_DEFAULT_PASS=toor \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
SpringAMQP
AMQP(Advance Message Queuing Protocol):是用于在应用程序或之间传递业务消息的开放标准,该协议与语言平台无关,更符合微服务中独立性的要求
Spring AMQP: 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分:
- spring-amqp:基础抽象
- spring-rabbit:底层默认实现
特征
- 监听器容器,用于异步处理入站消息
- 用于发送和接收消息的RabbitTemplate
- RabbitAdmin用于自动声明队列、交换和绑定
相关文章:

SpringCloud(五)MQ消息队列
MQ概念常见消息模型helloworld案例实现实现spring AMQP发送消息实现spring AMQP接收消息工作消息队列实现发布订阅模型Fanout Exchange实现DirectExchange实现TopicExchange实现DirectExchange 和FanoutExchange的差异DirectExchange 和TopicExchange的差异基于RabbitListener注…...
SQL语法基础汇总
三年前的存稿 默认端口号 3306 超级用户名 root 登录 mysql -uroot -p / mysql -uroot -proot 退出 exit / quit 服务器版本 SELECT VERSION(); 当前日期 SELECT NOW(); 当前用户 SELECT USER(); 备份 mysqldump -uroot -p 数据库名称 > 保存的路径 还原 create database1-…...

惠普星14Pro电脑开机不了显示错误代码界面怎么办?
惠普星14Pro电脑开机不了显示错误代码界面怎么办?有用户电脑开机之后,进入了一个错误界面,里面有一些错误代码。重启电脑之后依然是无法进入到桌面中,那么这个情况怎么去进行解决呢?我们可以重装一个新系统,…...

顺序表的构造及功能
定义顺序表是一种随机存储都结构,其特点是表中的元素的逻辑顺序与物理顺序相同。假设线性表L存储起始位置为L(A),sizeof(ElemType)是每个数据元素所占的存储空间的大小,则线性表L所对应的顺序存储如下图。顺序表的优缺点优点:随机…...

cesium: 绘制线段(008)
第008个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中绘制线段,左键点击开始绘制,右键点击取消绘制 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共139行)相关API参考:专栏目标示例效果 配置方式 1)…...

HTML、CSS学习笔记4(3D转换、动画)
目录 一、空间转换(3D转换) 1.空间位移 语法: 取值:(正负均可) 透视: 2.空间旋转 3.立体呈现 二、动画(animation) 1.动画的使用 先定义动画 再调用定义好的动画 …...
java的分布式锁
什么是分布式锁 分布式锁是指分布式环境下,系统部署在多个机器中,实现多进程分布式互斥的一种锁。为了保证多个进程能看到锁,锁被存在公共存储(比如 Redis、Memcache、数据库等三方存储中),以实现多个进程并…...

17- TensorFlow实现手写数字识别 (tensorflow系列) (项目十七)
项目要点 模型创建: model Sequential()添加卷积层: model.add(Dense(32, activationrelu, input_dim100)) # 第一层需要 input_dim添加dropout: model.add(Dropout(0.2))添加第二次网络: model.add(Dense(512, activationrelu)) # 除了first, 其他层不要输入shape添加输出…...

Polkadot 基础
Polkadot Polkadot联合并保护了一个不断增长的专业区块链生态系统,称为parachains。Polkadot上的应用程序和服务可以安全地跨链通信,形成真正可互操作的去中心化网络的基础。 真正的互操作性 Polkadot支持跨区块链传输任何类型的数据或资产,…...

spring源码编译
spring源码编译1、安装gradle2、拉取源码3、配置gradle文件来源及镜像仓库4、预编译5、验证6、可能遇到的报错6.1、jdk.jfr不存在6.2、checkstyleMain6.3、org.gradle.api.artifacts.result.ComponentSelectionReason.getDescription()Ljava/lang/String6.4、其他jdk࿱…...
防盗链是什么?带你了解什么是防盗链
目录 什么是防盗链 防盗链的定义 防盗链的产生 防盗链的实现 什么是防盗链 防盗链其实就是采用服务器端编程,通过url过滤技术实现的防止盗链的软件。 比如:photo.abc.com/video.mp4 这个下载地址,如果没有装防盗链,别人就能轻…...

Linux基础命令-fdisk管理磁盘分区表
文章目录 fdisk 命令介绍 命令格式 基本参数 1)常用参数 2)fdisk菜单操作说明 创建一个磁盘分区 1)创建分区 2)创建交换分区 参考实例 1) 显示当前分区的信息 2) 显示每个磁盘的分区信息 命令…...

(四)K8S 安装 Nginx Ingress Controller
ingress-nginx 是 Kubernetes 的入口控制器,使用NGINX作为反向代理和负载均衡器 版本介绍 版本1:Ingress NGINX Controller(k8s社区的ingres-nginx) 以 NGINX 开源技术为基础(kubernetes.io),可在GitHub的 kubernet…...
高频面试题
MyISAM和InnoDB是MySQL两种常见的存储引擎,它们之间有以下几点区别: 事务支持:MyISAM不支持事务处理,而InnoDB支持事务处理。 行级锁:MyISAM只支持表级锁,而InnoDB支持行级锁,可以避免并发访问…...
js 字节数组操作,TCP协议组装
js字节数组,进制转换js基础知识数组 Array扩展操作符三个点(...)ArrayBufferslice() 数组复制reduce 对数组中的每个元素执行一个提供的函数,将其结果汇总为单个返回值splice 数组删除,添加,替换js 字节数组转数字以及…...
JavaScript的引入并执行-包含动态引入与静态引入
JavaScript的引入并执行-包含动态引入与静态引入 JavaScript引入方式 html文件需要引入JavaScript代码,才能在页面里使用JavaScript代码。 静态引入 行内式 直接在DOM标签上使用 <!DOCTYPE html> <html lang"en"> <head><meta ch…...

第四阶段01-酷鲨商城项目准备
1. 关于csmall-product项目 这是“酷鲨商城”大项目中的“商品管理”项目,是一个后台管理项目(给管理员,或运营人员使用的项目,并不是普通用户使用的),并且,只会涉及与发布商品可能相关的功能开…...

Uncaught ReferenceError: jQuery is not defined
今天在拉取项目部署到本地的时候遇到了一个问题特此记录一下 (以后闭坑) 我和同事同时拉取了一样的代码,结果同事的页面加载正常而我的页面像被狗啃了一样,知道是js的问题但是不知道问题出在哪里?后来还是同事帮我解决…...

面试阿里测开岗,被面试官针对,当场翻脸,把我的简历还给我,疑似被拉黑...
好家伙,金三银四一到,这奇葩事可真是多,前两天和粉丝聊天,他说前段时间面试阿里的测开岗,最后和面试官干起来了。 我问他为什么,他说没啥,就觉得面试官太装了,就爱问一些虚而不实的…...

2. 驱动开发--驱动开发环境搭建
文章目录前言一、Linux中配置编译环境1.1 linux下安装软件的方法1.2 交叉编译工具链的安装1.2.1 测试是否安装成功1.3 设置环境变量1.3.1 将工具链导出到环境变量1.4 为工具链创建arm-linux-xxx符号链接二、 搭建运行开发环境2.1 tftp网络方式加载内核和设备树文件2.2 nfs网络方…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...
应用升级/灾备测试时使用guarantee 闪回点迅速回退
1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间, 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点,不需要开启数据库闪回。…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...

如何理解 IP 数据报中的 TTL?
目录 前言理解 前言 面试灵魂一问:说说对 IP 数据报中 TTL 的理解?我们都知道,IP 数据报由首部和数据两部分组成,首部又分为两部分:固定部分和可变部分,共占 20 字节,而即将讨论的 TTL 就位于首…...
Rapidio门铃消息FIFO溢出机制
关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...

云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...

接口自动化测试:HttpRunner基础
相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具,支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议,涵盖接口测试、性能测试、数字体验监测等测试类型…...
多模态图像修复系统:基于深度学习的图片修复实现
多模态图像修复系统:基于深度学习的图片修复实现 1. 系统概述 本系统使用多模态大模型(Stable Diffusion Inpainting)实现图像修复功能,结合文本描述和图片输入,对指定区域进行内容修复。系统包含完整的数据处理、模型训练、推理部署流程。 import torch import numpy …...