当前位置: 首页 > news >正文

SpringCloud(五)MQ消息队列

MQ

  • 概念
  • 常见消息模型
    • helloworld案例
      • 实现
        • 实现spring AMQP发送消息
        • 实现spring AMQP接收消息
    • 工作消息队列
      • 实现
    • 发布订阅模型
      • Fanout Exchange
        • 实现
      • DirectExchange
        • 实现
      • TopicExchange
        • 实现
      • DirectExchange 和FanoutExchange的差异
      • DirectExchange 和TopicExchange的差异
    • 基于@RabbitListener注解声明队列有 哪些常用注
  • 消息转换器
    • 注意
  • 同步调用
  • 异步调用
  • 安装
  • SpringAMQP
    • 特征

概念

MQ(MessageQueue):消息队列,事件驱动架构中的Broker
在这里插入图片描述

  • channel:操作MQ的工具
  • exchange:路由消息到队列
  • queue:缓存消息
  • virtual host: 虚拟主机,是对queue、exchange等资源逻辑分组

常见消息模型

在这里插入图片描述在这里插入图片描述

helloworld案例

角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息
    在这里插入图片描述

实现

实现spring AMQP发送消息

  • 在父工程引入spring-amqp的依赖
        <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  • 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

    • 在publisher服务中编写application.yml,添加mq连接信息
    spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor
    
    • 在publisher服务中新建一个测试类,编写测试方法
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue(){String queueName = "simple.queue";String message = "hello,spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}
    }
    

实现spring AMQP接收消息

  • 在父工程引入spring-amqp的依赖
        <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  • 在consumer服务中编写消费逻辑,监听simple.queue。

    • 在consumer服务中编写application.yml,添加mq连接信息
    spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor
    
    • 在publisher服务中新建一个测试类,编写测试方法
    @Component
    public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}
    }

工作消息队列

作用: 提高消息处理速度,避免队列消息堆积。
在这里插入图片描述

实现

  1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
	  @Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testWorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello,spring amqp!";for (int i = 0; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}
  1. 在consumer服务中定义两个消息监听者,都监听simple.queue队列
    @RabbitListener(queues = "simple.queue")public void listenWorkQueueMessage1 (String msg) throws InterruptedException{System.out.println("spring 消费者1接收到消息:【"+msg+"】");Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueueMessage2 (String msg) throws InterruptedException{System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色Thread.sleep(200);
  1. 消费者1每秒处理50条消息,消费者2每秒处理10条消息
    • 修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toorlistener:direct:prefetch: 1

发布订阅模型

概念: 与之前模型区别是,允许将同一消息发送给多个消费者。
实现方式: exchange(交换机)
exchange: 负责消息路由,不存储,路由失败则消息丢失
常见exchange类型

  • Fanout:广播
  • Direct:路由
  • Topic:话题

在这里插入图片描述

Fanout Exchange

Fanout Exchange:将接收到的消息路由到每一个跟其绑定的queue

实现

  1. 在consumer服务中,利用代码声明队列(Queue)、交换机(Exchange),并将两者绑定(Binding)

    1. SringAMQP提供了声明交换机、队列、绑定关系的API。
      在这里插入图片描述
    2. 在consumer服务常见一个类,添加@configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding。
    @Configuration
    public class FanoutConfig {//声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("root.fanout");}//声明第一个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}
    //第二个同第一个
    }
    
  2. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}
  1. 在publisher中编写测试方法,向root.fanout发送消息
    @Testpublic void testSendFanoutExchange(){String exchangeName = "root.fanout";String message = "hello,spring amqp!";rabbitTemplate.convertAndSend(exchangeName,message);}

DirectExchange

DirectExchange: 将接收到的消息更具规则路由到指定的Queue,因此称为路由模式(routes)

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
    在这里插入图片描述

实现

  1. 利用@RabbitListener声明Exchange、Queue、RoutingKey
  2. 在consumer服务中编写两个消费者方法,分别监听direct.queue1和direct.queue2
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name="direct.queue1"),exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到direct.queue1的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name="direct.queue2"),exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到direct.queue2的消息:【"+msg+"】");}
  1. 在publisher中编写测试方法,向root.direct发送消息
    @Testpublic void testSendDirectExchange(){String exchangeName = "root.direct";String message = "hello,red!";rabbitTemplate.convertAndSend(exchangeName,"red",message);}

TopicExchange

TopicExchange: 与DirectExchange类似,区别在于routineKey必须是多个单词的列表,并且以== . ==分割。

Queue与Exchange指定BindingKey时可以使用通配符
# :代指0个或多个单词
*:代指一个单词
在这里插入图片描述

实现

  1. 利用@RabbitListener声明Exchange、Queue、RoutingKey
  2. 在consumer服务中编写两个消费者方法,分别监听topic.queue1和topic.queue2
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name="topic.queue1"),exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到topic.queue1的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name="topic.queue2"),exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到topic.queue2的消息:【"+msg+"】");}
  1. 在publisher中编写测试方法,向root.topic发送消息
    @Testpublic void testSendTopicExchange(){String exchangeName = "root.topic";String message = "hello,china.news!";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}

DirectExchange 和FanoutExchange的差异

  • FanoutExchange将消息路由给每一个与之绑定的队列
  • DirectExchange根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

DirectExchange 和TopicExchange的差异

  • TopicExchange的routineKey必须使用多个单词,以== . ==分割
  • TopicExchange可以使用通配符

基于@RabbitListener注解声明队列有 哪些常用注

  • @Queue
  • @Exchange

消息转换器

设置JSON方式序列化
发送消息

  • 在publisher服务引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
  • 在publisher服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}

接收消息

  • 在consumer服务引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
  • 在consumer服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
  • 定义消费者
    @RabbitListener(queues = "object.queue")public void listenObjectQueueMessage(Map<String,Object> msg){System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色}

注意

MessageConverter默认是JDK序列化
接收方和发送方必须使用相同的MessageConverter

同步调用

优点: 时效性高
问题:

  • 耦合度高
  • 性能下降
  • 资源浪费
  • 级联失败

异步调用

实现方式:

  • 事件驱动(常用)

优势:

  • 服务解耦
  • 性能提升,吞吐量提高
  • 故障隔离。没有强依赖,不担心级联失败问题
  • 流量削锋

缺点:

  • 依赖Broker的可靠性、安全性、吞吐能力
  • 架构复杂、业务没有明显的流程线,不好追踪管理

安装

docker pull rabbitmq:3-managementdocker run \-e RABBITMQ_DEFAULT_USER=root \-e RABBITMQ_DEFAULT_PASS=toor \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

SpringAMQP

AMQP(Advance Message Queuing Protocol):是用于在应用程序或之间传递业务消息的开放标准,该协议与语言平台无关,更符合微服务中独立性的要求

Spring AMQP: 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分:

  • spring-amqp:基础抽象
  • spring-rabbit:底层默认实现

特征

  • 监听器容器,用于异步处理入站消息
  • 用于发送和接收消息的RabbitTemplate
  • RabbitAdmin用于自动声明队列、交换和绑定

相关文章:

SpringCloud(五)MQ消息队列

MQ概念常见消息模型helloworld案例实现实现spring AMQP发送消息实现spring AMQP接收消息工作消息队列实现发布订阅模型Fanout Exchange实现DirectExchange实现TopicExchange实现DirectExchange 和FanoutExchange的差异DirectExchange 和TopicExchange的差异基于RabbitListener注…...

SQL语法基础汇总

三年前的存稿 默认端口号 3306 超级用户名 root 登录 mysql -uroot -p / mysql -uroot -proot 退出 exit / quit 服务器版本 SELECT VERSION(); 当前日期 SELECT NOW(); 当前用户 SELECT USER(); 备份 mysqldump -uroot -p 数据库名称 > 保存的路径 还原 create database1-…...

惠普星14Pro电脑开机不了显示错误代码界面怎么办?

惠普星14Pro电脑开机不了显示错误代码界面怎么办&#xff1f;有用户电脑开机之后&#xff0c;进入了一个错误界面&#xff0c;里面有一些错误代码。重启电脑之后依然是无法进入到桌面中&#xff0c;那么这个情况怎么去进行解决呢&#xff1f;我们可以重装一个新系统&#xff0c…...

顺序表的构造及功能

定义顺序表是一种随机存储都结构&#xff0c;其特点是表中的元素的逻辑顺序与物理顺序相同。假设线性表L存储起始位置为L(A)&#xff0c;sizeof(ElemType)是每个数据元素所占的存储空间的大小&#xff0c;则线性表L所对应的顺序存储如下图。顺序表的优缺点优点&#xff1a;随机…...

cesium: 绘制线段(008)

第008个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中绘制线段,左键点击开始绘制,右键点击取消绘制 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共139行)相关API参考:专栏目标示例效果 配置方式 1)…...

HTML、CSS学习笔记4(3D转换、动画)

目录 一、空间转换&#xff08;3D转换&#xff09; 1.空间位移 语法&#xff1a; 取值&#xff1a;&#xff08;正负均可&#xff09; 透视&#xff1a; 2.空间旋转 3.立体呈现 二、动画&#xff08;animation&#xff09; 1.动画的使用 先定义动画 再调用定义好的动画 …...

java的分布式锁

什么是分布式锁 分布式锁是指分布式环境下&#xff0c;系统部署在多个机器中&#xff0c;实现多进程分布式互斥的一种锁。为了保证多个进程能看到锁&#xff0c;锁被存在公共存储&#xff08;比如 Redis、Memcache、数据库等三方存储中&#xff09;&#xff0c;以实现多个进程并…...

17- TensorFlow实现手写数字识别 (tensorflow系列) (项目十七)

项目要点 模型创建: model Sequential()添加卷积层: model.add(Dense(32, activationrelu, input_dim100)) # 第一层需要 input_dim添加dropout: model.add(Dropout(0.2))添加第二次网络: model.add(Dense(512, activationrelu)) # 除了first, 其他层不要输入shape添加输出…...

Polkadot 基础

Polkadot Polkadot联合并保护了一个不断增长的专业区块链生态系统&#xff0c;称为parachains。Polkadot上的应用程序和服务可以安全地跨链通信&#xff0c;形成真正可互操作的去中心化网络的基础。 真正的互操作性 Polkadot支持跨区块链传输任何类型的数据或资产&#xff0c;…...

spring源码编译

spring源码编译1、安装gradle2、拉取源码3、配置gradle文件来源及镜像仓库4、预编译5、验证6、可能遇到的报错6.1、jdk.jfr不存在6.2、checkstyleMain6.3、org.gradle.api.artifacts.result.ComponentSelectionReason.getDescription()Ljava/lang/String6.4、其他jdk&#xff1…...

防盗链是什么?带你了解什么是防盗链

目录 什么是防盗链 防盗链的定义 防盗链的产生 防盗链的实现 什么是防盗链 防盗链其实就是采用服务器端编程&#xff0c;通过url过滤技术实现的防止盗链的软件。 比如&#xff1a;photo.abc.com/video.mp4 这个下载地址&#xff0c;如果没有装防盗链&#xff0c;别人就能轻…...

Linux基础命令-fdisk管理磁盘分区表

文章目录 fdisk 命令介绍 命令格式 基本参数 1&#xff09;常用参数 2&#xff09;fdisk菜单操作说明 创建一个磁盘分区 1&#xff09;创建分区 2&#xff09;创建交换分区 参考实例 1&#xff09; 显示当前分区的信息 2&#xff09; 显示每个磁盘的分区信息 命令…...

(四)K8S 安装 Nginx Ingress Controller

ingress-nginx 是 Kubernetes 的入口控制器&#xff0c;使用NGINX作为反向代理和负载均衡器 版本介绍 版本1&#xff1a;Ingress NGINX Controller(k8s社区的ingres-nginx) 以 NGINX 开源技术为基础&#xff08;kubernetes.io&#xff09;&#xff0c;可在GitHub的 kubernet…...

高频面试题

MyISAM和InnoDB是MySQL两种常见的存储引擎&#xff0c;它们之间有以下几点区别&#xff1a; 事务支持&#xff1a;MyISAM不支持事务处理&#xff0c;而InnoDB支持事务处理。 行级锁&#xff1a;MyISAM只支持表级锁&#xff0c;而InnoDB支持行级锁&#xff0c;可以避免并发访问…...

js 字节数组操作,TCP协议组装

js字节数组&#xff0c;进制转换js基础知识数组 Array扩展操作符三个点&#xff08;...&#xff09;ArrayBufferslice() 数组复制reduce 对数组中的每个元素执行一个提供的函数,将其结果汇总为单个返回值splice 数组删除&#xff0c;添加&#xff0c;替换js 字节数组转数字以及…...

JavaScript的引入并执行-包含动态引入与静态引入

JavaScript的引入并执行-包含动态引入与静态引入 JavaScript引入方式 html文件需要引入JavaScript代码&#xff0c;才能在页面里使用JavaScript代码。 静态引入 行内式 直接在DOM标签上使用 <!DOCTYPE html> <html lang"en"> <head><meta ch…...

第四阶段01-酷鲨商城项目准备

1. 关于csmall-product项目 这是“酷鲨商城”大项目中的“商品管理”项目&#xff0c;是一个后台管理项目&#xff08;给管理员&#xff0c;或运营人员使用的项目&#xff0c;并不是普通用户使用的&#xff09;&#xff0c;并且&#xff0c;只会涉及与发布商品可能相关的功能开…...

Uncaught ReferenceError: jQuery is not defined

今天在拉取项目部署到本地的时候遇到了一个问题特此记录一下 &#xff08;以后闭坑&#xff09; 我和同事同时拉取了一样的代码&#xff0c;结果同事的页面加载正常而我的页面像被狗啃了一样&#xff0c;知道是js的问题但是不知道问题出在哪里&#xff1f;后来还是同事帮我解决…...

面试阿里测开岗,被面试官针对,当场翻脸,把我的简历还给我,疑似被拉黑...

好家伙&#xff0c;金三银四一到&#xff0c;这奇葩事可真是多&#xff0c;前两天和粉丝聊天&#xff0c;他说前段时间面试阿里的测开岗&#xff0c;最后和面试官干起来了。 我问他为什么&#xff0c;他说没啥&#xff0c;就觉得面试官太装了&#xff0c;就爱问一些虚而不实的…...

2. 驱动开发--驱动开发环境搭建

文章目录前言一、Linux中配置编译环境1.1 linux下安装软件的方法1.2 交叉编译工具链的安装1.2.1 测试是否安装成功1.3 设置环境变量1.3.1 将工具链导出到环境变量1.4 为工具链创建arm-linux-xxx符号链接二、 搭建运行开发环境2.1 tftp网络方式加载内核和设备树文件2.2 nfs网络方…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

逻辑回归:给不确定性划界的分类大师

想象你是一名医生。面对患者的检查报告&#xff08;肿瘤大小、血液指标&#xff09;&#xff0c;你需要做出一个**决定性判断**&#xff1a;恶性还是良性&#xff1f;这种“非黑即白”的抉择&#xff0c;正是**逻辑回归&#xff08;Logistic Regression&#xff09;** 的战场&a…...

JavaScript 中的 ES|QL:利用 Apache Arrow 工具

作者&#xff1a;来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗&#xff1f;了解下一期 Elasticsearch Engineer 培训的时间吧&#xff01; Elasticsearch 拥有众多新功能&#xff0c;助你为自己…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例

使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件&#xff0c;常用于在两个集合之间进行数据转移&#xff0c;如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model&#xff1a;绑定右侧列表的值&…...

使用分级同态加密防御梯度泄漏

抽象 联邦学习 &#xff08;FL&#xff09; 支持跨分布式客户端进行协作模型训练&#xff0c;而无需共享原始数据&#xff0c;这使其成为在互联和自动驾驶汽车 &#xff08;CAV&#xff09; 等领域保护隐私的机器学习的一种很有前途的方法。然而&#xff0c;最近的研究表明&…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

06 Deep learning神经网络编程基础 激活函数 --吴恩达

深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...

Bean 作用域有哪些?如何答出技术深度?

导语&#xff1a; Spring 面试绕不开 Bean 的作用域问题&#xff0c;这是面试官考察候选人对 Spring 框架理解深度的常见方式。本文将围绕“Spring 中的 Bean 作用域”展开&#xff0c;结合典型面试题及实战场景&#xff0c;帮你厘清重点&#xff0c;打破模板式回答&#xff0c…...