RabbitMQ快速入门(MQ的概念、安装RabbitMQ、在 SpringBoot 项目中集成 RabbitMQ )
文章目录
- 1. 补充知识:同步通讯和异步通讯
- 1.1 同步通讯
- 1.2 异步通讯
- 2. 同步调用的缺点
- 2.1 业务耦合
- 2.2 性能较差
- 2.3 级联失败
- 3. 什么情况下使用同步调用
- 4. 异步调用
- 5. 异步调用的优点和缺点
- 5.1 异步调用的优点
- 5.1.1 解除耦合,拓展性强
- 5.1.2 无需等待,性能好
- 5.1.3 故障隔离
- 5.1.4 削峰填谷
- 5.2 异步调用的缺点
- 5.2.1 不能得到调用结果
- 5.2.2 不确定下游业务执行是否成功
- 5.2.3 业务安全依赖于消息代理的可靠性
- 6. MQ 的技术选型
- 7. 安装 RabbitMQ 并启动 RabbitMQ
- 7.1 搜索 RabbitMQ 镜像
- 7.2 下载 RabbitMQ 镜像
- 7.3 启动 RabbitMQ
- 7.4 访问 RabbitMQ 的管理页面
- 7.5 可能遇到的问题
- 7.5.1 安全组和防火墙未开放端口
- 7.5.2 RabbitMQ 没有安装 Web 插件
- 8. RabbitMQ 的整体架构和核心概念
- 9. RabbitMQ 快速入门
- 9.1 新建队列
- 9.2 绑定队列与交换机
- 9.3 发送消息
- 9.4 可能遇到的问题
- 10. 数据隔离
- 10.1 新建用户
- 10.2 为新用户创建一个 VirtualHost
- 10.3 测试不同 VirtualHost 之间是否有数据隔离
- 11. 在 SpringBoot 项目中集成 RabbitMQ
- 11.1 AMQP 和 SpringAMQP
- 11.2 快速入门
- 11.2.1 引入 Maven 依赖
- 11.2.2 编写与 RabbitMQ 有关的配置信息
- 11.3 完成一个简单的案例
- 11.3.1 创建队列
- 11.3.2 发送消息
- 11.3.3 接收消息
- 12. Work Queues 模型
- 12.1 Work Queues 的概念
- 12.2 Work Queues 模型的消息推送机制
- 13. 交换机
- 13.1 Fanout 交换机
- 13.1.1 Fanout 交换机的概念
- 13.1.2 快速上手
- 13.2 Direct 交换机
- 13.2.1 Direct 交换机的概念
- 13.2.2 快速上手
- 13.3 Topic 交换机(推荐使用)
- 13.3.1 Topic 交换机的概念
- 13.3.2 快速上手
- 14. 在 SpringBoot 项目中声明队列和交换机的方式
- 14.1 编程式声明
- 14.1.1 SpringAQMP提供的创建队列和交换机的类
- 14.1.2 快速上手
- 14.1.3 编程式声明的缺点
- 14.2 注解式声明(推荐使用)
- 15. 消息转换器
- 15.1 默认的消息转换器
- 15.2 自定义消息转换器
1. 补充知识:同步通讯和异步通讯
1.1 同步通讯
同步通讯是指发送方在发送消息后,会等待接收方的回应,直到收到回应后才会继续执行后续操作
同步通讯的特点是:
- 阻塞:发送方在等待回应期间会被阻塞,无法进行其他操作
- 顺序执行:消息的处理是按照发送和接收的顺序进行的,确保了消息的时序性
- 实时反馈:发送方可以立即得到接收方的回应,适用于需要立即确认的场景
- 占用资源:由于需要等待,可能会造成资源的浪费,如线程阻塞
打电话就是一个典型的同步通讯例子,通话双方必须实时交流,一方说话时,另一方必须等待
1.2 异步通讯
异步通讯是指发送方在发送消息后,不需要等待接收方的立即回应,就可以继续执行其他操作。接收方在处理完消息后,可能会在未来的某个时间点给出回应
异步通讯的特点是:
- 非阻塞:发送方在发送消息后可以立即继续其他工作,不会因为等待回应而被阻塞
- 解耦:发送方和接收方在时间上解耦,可以独立处理各自的任务
- 灵活:异步通讯可以处理更复杂的通信模式,如消息队列、事件驱动等
- 资源利用率高:更高效地利用资源,因为不需要等待,可以提高系统的吞吐量
电子邮件是一个异步通讯的例子,你可以发送一封邮件后继续做其他事情,收件人可以在任何时间回复邮件(微信聊天也是一个异步通讯的例子)
2. 同步调用的缺点
我们以支付业务
为例分析同步调用的缺点
支付业务采用的是同步调用的方式,因为我们在执行更新支付状态
操作和更新订单状态
之前,需要先知道扣减余额
操作的结果,这种同步调用方式存在几个问题
2.1 业务耦合
第一个问题就是业务耦合的问题,对于支付服务来说,最重要的一件事就是扣减用户的余额,然后更新支付状态
后续的更新订单状态
操作跟支付服务是没什么关系的,但是支付成功之后确实需要更新订单状态,所以支付服务不得不调用交易服务来更新订单状态
那有同学就说了,我在支付服务里面加一行代码不就可以调用交易服务了吗,听起来没什么问题,但是业务是会变化的,产品经理的脑洞你也是想象不到的
想象一下,产品经理提了一个新的需求,用户支付成功之后要发一个短信通知用户,产品经理一提需求,我们就要更改源代码
某一天,产品经理又提了一个新需求,用户支付成功之后,要为用户增加一定的积分
这种同步调用的方式拓展性比较差,不符合面向对象编程中的开闭原则
2.2 性能较差
如果采用同步调用的方式,支付服务需要等待其它所有服务完成操作,耗时会大大增长,十分影响用户的体验
2.3 级联失败
想象一下,交易服务出现故障了,而这个故障迟迟没有得到解决,最终就很有可能拖垮支付服务,导致支付服务的资源被耗尽,也出现故障,出现级联失败的情况
3. 什么情况下使用同步调用
经过上面的分析,有同学可能会有这样的疑问:既然同步调用有这么多问题,为什么我们还要用同步调用呢,什么情况下使用同步调用呢
一般来说,使用同步调用的场景都有一个特点:下一步操作依赖于上一步操作的结果
以上面的支付业务为例,交易服务、通知服务、积分服务都依赖于支付服务的结果
当支付服务成功扣减用户余额并成功更新支付状态之后,交易服务、通知服务、积分服务就可以开始执行相应的操作了
然而,通知服务不依赖于交易服务,积分服务也不依赖于通知服务
在成功扣减用户余额并成功更新支付状态之后,支付业务就已经完成了
所以说,支付服务完成了之后,只需要通知交易服务、通知服务、积分服务执行相应的操作,而不需要等待交易服务、通知服务、积分服务都完成之后再返回结果
4. 异步调用
异步调用基于消息通知,一般包含三个角色消息
- 发送者:投递消息的人
- 消息代理:管理、暂存、转发消息的人
- 消息接收者:接收和处理消息的人
改为异步调用之后,支付服务不再同步调用与支付业务关联度低的服务,而是发送消息通知于支付业务关联度低的服务
5. 异步调用的优点和缺点
5.1 异步调用的优点
5.1.1 解除耦合,拓展性强
即使以后有新业务拓充,支付服务只需要发送一条消息给消息代理,让消息代理通知新业务,拓展性强
5.1.2 无需等待,性能好
支付服务完成之后只需要发送消息给消息代理,让消息代理通知其它服务
5.1.3 故障隔离
即使交易服务出现了故障,也不会影响到支付服务
5.1.4 削峰填谷
假如支付服务正在面临着很大的压力,流量时高时低(呈波浪形)。如果采用同步调用的方式,当流量很高的时候,交易服务、通知服务、积分服务可能扛不住
但如果采用异步调用的方式,就很少会出现交易服务、通知服务、积分服务扛不住的情况。为什么呢,因为消息代理容量很大。在高并发的情况下,用户每成功支付一次,支付服务只需要发送一条消息给消息代理,这些像洪水一般的消息都会被消息代理拦住
消息代理会保存这些消息,后续服务可以根据自己的处理速度,从消息代理中一条一条地取出信息并处理。这样一来,后续服务承受的压力将会变得很平缓
5.2 异步调用的缺点
5.2.1 不能得到调用结果
异步调用一般是通知对方执行某个操作,无法知道对方执行操作后的结果
5.2.2 不确定下游业务执行是否成功
异步调用通知下游业务后,无法知道下游业务是否执行成功
5.2.3 业务安全依赖于消息代理的可靠性
下游业务的执行依赖于消息代理的可靠性,一旦消息代理出现故障,下游业务将无法执行
6. MQ 的技术选型
MQ:Message Queue,消息队列
以下是当前主流的消息队列
在这里重点提一下 Kafka ,Kafka 的吞吐量非常高,适合大规模日志场景
目前国内大部分公司采用的都是 RabbitMQ
7. 安装 RabbitMQ 并启动 RabbitMQ
RabbitMQ是基于 Erlang 语言开发的开源消息通信中间件(官网:RabbitMQ)
我们基于 docker 安装 RabbitMQ
7.1 搜索 RabbitMQ 镜像
sudo docker search rabbitmq
7.2 下载 RabbitMQ 镜像
sudo docker pull rabbitmq
检查 RabbitMQ 镜像是否下载成功
sudo docker images
7.3 启动 RabbitMQ
sudo docker run \-e RABBITMQ_DEFAULT_USER=wuyanzu \-e RABBITMQ_DEFAULT_PASS=bhoLdSvpd0UAOysh \-v rabbitmq-plugins:/plugins \--name rabbitmq \--hostname rabbitmq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:latest
指令说明:
-
sudo docker run
: 基本的Docker命令,用于启动一个新的容器实例 -
-e RABBITMQ_DEFAULT_USER=wuyanzu
: 设置RabbitMQ服务的默认用户名为wuyanzu
-
-e RABBITMQ_DEFAULT_PASS=kZoeSW$$xS5i^Cum
: 设置RabbitMQ服务的默认密码为bhoLdSvpd0UAOysh
-
-v rabbitmq-plugins:/plugins
: 将一个名为rabbitmq-plugins
的卷映射到容器的/plugins
目录,用于存放RabbitMQ的插件。这里的rabbitmq-plugins
是一个卷的名称,而不是宿主机的路径 -
--name rabbitmq
: 指定容器的名称为rabbitmq
-
--hostname rabbitmq
: 设置容器的主机名为rabbitmq
-
-p 15672:15672
: 将宿主机的端口15672
映射到容器的端口15672
,这是RabbitMQ管理界面的默认端口 -
-p 5672:5672
: 将宿主机的端口5672
映射到容器的端口5672
,这是RabbitMQ用于AMQP协议通信的默认端口 -
-d
: 在后台运行容器(守护进程) -
rabbitmq:latest
: 使用最新的RabbitMQ官方镜像来创建容器
7.4 访问 RabbitMQ 的管理页面
接下来进入 RabbitMQ 的管理界面,在浏览器输入以下地址(将 IP 地址换成你的虚拟机的 IP 地址)
http://127.0.0.1:15672/
输入用户名和密码后进入到 RabbitMQ 的管理页面
7.5 可能遇到的问题
7.5.1 安全组和防火墙未开放端口
如果无法进入RabbitMQ的管理界面,记得先在安全组和防火墙中开放 15672 和 5672 端口
在 Ubuntu 中开放15672 和 5672 端口
sudo ufw allow 15672
sudo ufw allow 5672
sudo ufw reload
在 CentOS 中开放15672 和 5672 端口
sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
sudo firewall-cmd --reload
7.5.2 RabbitMQ 没有安装 Web 插件
如果开放防火墙端口后还是无法访问 RabbitMQ 的管理界面,可能是安装 RabbitMQ 没有安装 Web 插件
以下是 RabbitMQ 安装 Web 插件的方法
第一步:进入容器内部
sudo docker exec -it rabbitmq bash
第二步:安装 Web 插件
rabbitmq-plugins enable rabbitmq_management
安装插件后退出容器内部
exit
8. RabbitMQ 的整体架构和核心概念
RabbitMQ 有几个核心概念:
- Publisher:消息发送者
- Consumer:消息的消费者
- Queue:消息队列,存储消息
- Exchange:交换机,负责路由消息
- VirtualHost:虚拟主机,用于数据隔离
RabbitMQ 的整体架构
9. RabbitMQ 快速入门
注意事项:交换机只能路由和转发消息,不能存储消息
9.1 新建队列
创建一个名为 hello.queue 的队列
9.2 绑定队列与交换机
我们将刚才新创建的 hello.queue 队列与 amq.fanout 交换机绑定(fanout意为扇出)
绑定成功后的界面
9.3 发送消息
我们在 amq.fanout 交换机中发送一条消息,消息的内容为 Hello, RabbitMQ!
发送消息后,查看交换机的总览信息
查看队列中的消息数
查看消息的具体内容
9.4 可能遇到的问题
如果你发现
- 交换机的 overview 页面没有折线图
- Queues 页面也没有与消息相关的信息
- 点击
channels
后出现Stats in management UI are disabled on this node
信息
需要先修改 RabbitMQ的 配置
第一步:进入容器内部
sudo docker exec -it rabbitmq bash
第二步:修改配置
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
第三步:重启容器
先退出容器内部
exit
再重启容器
sudo docker restart rabbitmq
10. 数据隔离
10.1 新建用户
新建一个名为 CaiXuKun 的用户,密码为 T1rhFXMGXIOYCoyi ,角色指定为 admin
可以看到,新用户对任意一个 VirtualHost 都是没有访问权限的
用新用户的账号登录管理台,虽然能看到所有 VirtualHost 的信息,但是无法对任意一个 VirtualHost 进行操作
10.2 为新用户创建一个 VirtualHost
用新用户的账号登录管理台,创建一个名为 /blog 的 VirtualHost
10.3 测试不同 VirtualHost 之间是否有数据隔离
可以看到,不同的 VirtualHost 之间有不同的交换机
对某一个 VirtualHost 操作不会影响到另一个 VirtualHost
11. 在 SpringBoot 项目中集成 RabbitMQ
后端环境:
- SpringBoot:3.0.2
- JDK:17.0.7
11.1 AMQP 和 SpringAMQP
SpringAMQP 的官网:Spring AMQP
11.2 快速入门
新建一个 SpringBoot 项目,并创建 consumer 和 publisher 两个子模块,项目的整体结构如下
11.2.1 引入 Maven 依赖
在父工程中引入 SpringAMQP 的依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
11.2.2 编写与 RabbitMQ 有关的配置信息
在 consumer 和 publisher 模块的 application.yml 中分别编写与 RabbitMQ 有关的配置信息
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /blogusername: CaiXuKunpassword: T1rhFXMGXIOYCoyi
11.3 完成一个简单的案例
案例要求如下:
- 在 RabbitMQ 的控制台中创建名为 simple.queue 的队列(队列归属的 VirtualHost 为 /blog)
- 在 publisher 模块中,利用 SpringAMQP 直接向 simple.queue 队列发送消息
- 在 consumer 服务中,利用 SpringAMQP 编写消费者,监听 simple.queue 队列
11.3.1 创建队列
11.3.2 发送消息
在 publisher 模块中编写测试类,用户向队列发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest(classes = PublisherApplication.class)
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessageToQueue() {String queueName = "simple.queue";String msg = "Hello, SpringAMQP!";rabbitTemplate.convertAndSend(queueName, msg);}}
在 RabbitMQ 的控制台可以看到,消息成功发送
11.3.3 接收消息
SpringAMQP 提供了声明式的消息监听,我们只需要通过@RabbitListener
注解在方法上声明要监听的队列名称,将来 SpringAMQP 就会把消息传递给使用了@RabbitListener
注解的方法
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitMQListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String message) {System.out.println("消费者收到了 simple.queue 的消息:【" + message + "】");}}
启动 consumer 模块的启动类之后,就可以看到消息的内容
12. Work Queues 模型
12.1 Work Queues 的概念
Work Queues,简单地来说,就是让多个消费者绑定到一个队列,共同消费队列中的消息
虽然有多个消费者绑定同一个队列,但是队列中的某一条消息只会被一个消费者消费
我们实现一个小案例,模拟 Work Queues,实现一个队列绑定多个消费者
案例要求如下:
- 在RabbitMQ的控制台创建一个队列,名为 work.queue
- 在 publisher 服务中定义测试方法,在 1 秒内产生 50 条消息,发送到work.queue
- 在 consumer 服务中定义两个消息监听者,都监听 work.queue 队列
- 消费者 1 每秒处理 50 条消息,消费者 2 每秒处理 5 条消息
在 publisher服务的 SpringAmqp 测试类中添加以下方法,该方法可以在 1 秒内产生 50 条消息
@Test
void testWorkQueue() throws InterruptedException {String queueName = "work.queue";for (int i = 1; i <= 50; i++) {String message = "Hello, work queues_" + i;rabbitTemplate.convertAndSend(queueName, message);Thread.sleep(20);}
}
在 consumer 服务的 RabbitMQListener 类中添加以下方法,监听 work.queue 队列
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) {System.out.println("消费者1 收到了 work.queue的消息:【" + message + "】");
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) {System.err.println("消费者2 收到了 work.queue的消息...... :【" + message + "】");
}
12.2 Work Queues 模型的消息推送机制
如果有两个或两个以上的消费者监听同一个队列,默认情况下 RabbitMQ 会采用轮询的方法将消息分配给每个队列
但每个消费者的消费能力可能是不一样的,我们给两个消费者中的代码设置不同的休眠时间,模拟消费能力的不同
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) throws InterruptedException {System.out.println("消费者1 收到了 work.queue的消息:【" + message + "】");Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) throws InterruptedException {System.err.println("消费者2 收到了 work.queue的消息...... :【" + message + "】");Thread.sleep(100);
}
经过测试可以发现,即使两个队列的消费能力不一样,默认情况下 RabbitMQ 还是会采用轮询的方法将消息分配给每个队列,也就是平均分配
但这不是我们想要的效果,我们想要的效果是消费能力强的消费者处理更多的消息,甚至能够帮助消费能力弱的消费者
怎么样才能达到这样的效果呢,只需要在配置文件中添加以下信息
spring:rabbitmq:listener:simple:prefetch: 1
这个配置信息相当于告诉消费者要一条一条地从队列中取出消息,只有处理完一条消息才能取出下一条
这样一来,就可以充分利用每一台机器的性能,让消费能力强的消费者处理更多的消息,同时还可以避免消息在消费能力较弱的消费者上发生堆积的情况
13. 交换机
真正的生产环境都会经过交换机来发送消息,而不是直接发送到队列
交换机的作用:
- 接收 publisher 发送的消息
- 将消息按照规则路由到与交换机绑定的队列
交换机的类型有以下三种:
- Fanout:广播
- Direct:定向
- Topic:话题
注意事项:交换机只能路由和转发消息,不能存储消息
13.1 Fanout 交换机
13.1.1 Fanout 交换机的概念
Fanout 交换机会将接收到的消息广播到每一个跟其绑定的 queue ,所以也叫广播模式
13.1.2 快速上手
我们做一个小案例来体验 Fanout 交换机的效果,案例要求如下:
- 在 RabbitMQ 控制台中,声明队列 fanout.queue1 和 fanout.queue2
- 在 RabbitMQ 控制台中,声明交换机 blog.fanout,将两个队列与其绑定
- 在 consumer 服务中,编写两个消费者方法,分别监听 fanout.queue1 和 fanout.queue2 队列
- 在 publisher 服务中编写测试方法,向 blog.fanout 交换机发送消息
声明交换机
在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 fanout.queue1 和 fanout.queue2 队列
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {System.out.println("消费者1 收到了 fanout.queue1的消息:【" + message + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {System.err.println("消费者2 收到了 fanout.queue2的消息...... :【" + message + "】");
}
在 publisher 服务的 SpringAmqp 测试类中添加以下方法,向 blog.fanout 交换机发送消息
@Test
void testSendFanout() {String exchangeName = "blog.fanout";String message = "Hello, fanout exchange";rabbitTemplate.convertAndSend(exchangeName, null, message);
}
13.2 Direct 交换机
13.2.1 Direct 交换机的概念
Direct 交换机会将接收到的消息根据规则路由到指定的队列,被称为定向路由
- 每一个 Queue 都与 Exchange 设置一个 bindingKey
- 发布者发送消息时,指定消息的 RoutingKey
- Exchange 将消息路由到 bindingKey 与消息 routingKey 一致的队列
需要注意的是:同一个队列可以绑定多个 bindingKey ,如果有多个队列绑定了同一个 bindingKey ,就可以实现类似于 Fanout 交换机的效果。由此可以看出,Direct 交换机的功能比 Fanout 交换机更强大
13.2.2 快速上手
我们做一个小案例来体验 Direct 交换机的效果,案例要求如下:
- 在 RabbitMQ 控制台中,声明队列 direct.queue1 和 direct.queue2
- 在 RabbitMQ 控制台中,声明交换机 blog.direct ,将上面创建的两个队列与其绑定
- 在 consumer 服务中,编写两个消费者方法,分别监听 direct.queue1 和 direct.queue2
- 在 publisher 服务中编写测试方法,利用不同的 RoutingKey 向 blog.direct 交换机发送消息
为 direct.queue1队列 和 direct.queue2 队列分别指定 bindingKey
在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 direct.queue1 和 direct.queue2 队列
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String message) {System.out.println("消费者1 收到了 direct.queue1的消息:【" + message + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String message) {System.err.println("消费者2 收到了 direct.queue2的消息...... :【" + message + "】");
}
在 publisher 服务的 SpringAmqp 测试类中添加以下方法,向 blog.direct交换机发送消息
@Test
void testSendDirect() {String exchangeName = "blog.direct";String blueMessage = "蓝色通知,警报解除,哥斯拉放的是气球";rabbitTemplate.convertAndSend(exchangeName, "blue", blueMessage);String redMessage = "红色警报,由于日本排放核污水,惊现哥斯拉!";rabbitTemplate.convertAndSend(exchangeName, "red", redMessage);String yellowMessage = "黄色通知,哥斯拉来了,快跑!";rabbitTemplate.convertAndSend(exchangeName, "yellow", yellowMessage);
}
13.3 Topic 交换机(推荐使用)
13.3.1 Topic 交换机的概念
Topic Exchange 与 Direct Exchange类似,区别在于 Topic Exchange 的 routingKey 可以是多个单词的列表(多个 routingKey 之间以.
分割)
Queue 与 Exchange 指定 bindingKey 时可以使用通配符
- #:代指 0 个或多个单词
- *:代指 1 个单词
- Topic 交换机能实现的功能 Direct 交换机也能实现,不过用 Topic 交换机实现起来更加方便
- 如果某条消息的 topic 符合多个 queue 的 bindingKey ,该条消息会发送给符合条件的所有 queue ,实现类似于 Fanout 交换机的效果
13.3.2 快速上手
我们做一个小案例来体验 Topic 交换机的效果,案例要求如下:
- 在 RabbitMQ 控制台中,声明队列 topic.queue1 和 topic.queue2
- 在 RabbitMQ 控制台中,声明交换机 blog.topic ,将两个队列与其绑定
- 在 consumer 服务中编写两个消费者方法,分别监听 topic.queue1 和 topic.queue2
- 在 publisher 服务中编写测试方法,利用不同的 routingKey 向 blog.topic 发送消息
为 topic.queue1 和 topic.queue2 队列分别指定 bindingKey
在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 direct.queue1 和 direct.queue2 队列
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String message) {System.out.println("消费者1 收到了 topic.queue1的消息:【" + message + "】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String message) {System.err.println("消费者2 收到了 topic.queue2的消息...... :【" + message + "】");
}
在 publisher 服务的 SpringAmqp 测试类中添加以下方法,向 blog.direct交换机发送消息
@Test
void testSendTopic() {String exchangeName = "blog.topic";String weatherMessage = "今天天气挺不错,我的心情的挺好的";rabbitTemplate.convertAndSend(exchangeName, "china.weather", weatherMessage);String newsMessage = "蓝色通知,警报解除,哥斯拉放的是气球";rabbitTemplate.convertAndSend(exchangeName, "china.news", newsMessage);
}
14. 在 SpringBoot 项目中声明队列和交换机的方式
我们之前创建队列和交换机都是在 RabbitMQ 的控制台页面中创建的,不仅十分繁琐,还有可能打错队列和交换机的名。而且,不同的环境(开发环境、测试环境、生产环境)可能会有不同的队列和交换机,手动创建队列和交换机效率十分低下
接下来为大家介绍两种在 SpringBoot 项目中声明队列和交换机的方式
14.1 编程式声明
14.1.1 SpringAQMP提供的创建队列和交换机的类
SpringAMQP 提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类 QueueBuilder 构建
- Exchange:用于声明交换机,可以用工厂类 ExchangeBuilder 构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类 BindingBuilder 构建
14.1.2 快速上手
我们创建一个 Fanout 类型的交换机,并且创建队列与这个交换机绑定
在 consumer 服务中编写 FanoutConfiguration 配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange3() {return ExchangeBuilder.fanoutExchange("blog.fanout3").build();}@Beanpublic FanoutExchange fanoutExchange4() {return new FanoutExchange("blog.fanout4");}@Beanpublic Queue fanoutQueue3() {return new Queue("fanout.queue3");}@Beanpublic Queue fanoutQueue4() {return QueueBuilder.durable("fanout.queue4").build();}@Beanpublic Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange3) {return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);}@Beanpublic Binding fanoutBinding4() {return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange4());}}
启动 consumer 的启动类之后,队列、交换机、队列和交换机之间的关系就会自动创建
创建 Queue 时,如果没有指定 durable 属性,则 durable 属性默认为 true
14.1.3 编程式声明的缺点
编程式声明有一个缺点,当队列和交换机之间绑定的 routingKey 有很多个时,编码将会变得十分麻烦
以下是一个队列与 Direct 类型交换机绑定两个 routingKey 时的代码
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange3() {return new DirectExchange("blog.direct3");}@Beanpublic Queue directQueue3() {return new Queue("direct.queue3");}@Beanpublic Queue directQueue4() {return new Queue("direct.queue4");}@Beanpublic Binding directQueue3BindingRed(Queue directQueue3, DirectExchange directExchange3) {return BindingBuilder.bind(directQueue3).to(directExchange3).with("red");}@Beanpublic Binding directQueue3BindingBlue(Queue directQueue3, DirectExchange directExchange3) {return BindingBuilder.bind(directQueue3).to(directExchange3).with("blue");}@Beanpublic Binding directQueue4BindingRed(Queue directQueue4, DirectExchange directExchange3) {return BindingBuilder.bind(directQueue4).to(directExchange3).with("red");}@Beanpublic Binding directQueue4BindingBlue(Queue directQueue4, DirectExchange directExchange3) {return BindingBuilder.bind(directQueue4).to(directExchange3).with("yellow");}}
14.2 注解式声明(推荐使用)
SpringAMOP 提供了基于@RabbitListener
注解声明队列和交换机的方式
我们先在 RabbitMQ 的控制台删除 blog.direct 交换机、 direct.queue1 队列和 direct.queue2 队列
再改造 consumer 服务的 RabbitMQListener 类的监听 direct.queue1 队列和 direct.queue2 队列的方法
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "blog.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String message) {System.out.println("消费者1 收到了 direct.queue1的消息:【" + message + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "blog.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String message) {System.out.println("消费者2 收到了 direct.queue2的消息:【" + message + "】");
}
15. 消息转换器
在了解消息转换器之前,我们先来做一个小案例,案例的内容是利用 SpringAMQP 发送一条消息,消息的内容为一个 Java 对象
案例要求如下:
- 在 RabbitMQ 控制台创建一个队列,名为 object.queue
- 编写单元测试,向该队列中直接发送一条消息,消息的内容为 Map
- 在控制台查看消息
在 publisher 服务的 SpringAmqpTests 测试类中新增 testSendObject 方法
@Test
void testSendObject() {Map<String, Object> hashMap = new HashMap<>(2);hashMap.put("name", "Tom");hashMap.put("age", 21);rabbitTemplate.convertAndSend("object.queue", hashMap);
}
成功发送消息后,我们在 RabbitMQ 的控制台查看消息的具体内容
可以发现,消息的内容类型为 application/x-java-serialized-object
,并且消息的内容也变成一堆乱码
我们本来是想发送一个简单的仅含有姓名和年龄两个字段的简短信息,但是消息却变成了一堆乱码,不仅可读性大大下降,而且占用的空间也大大地增加了,这显然不是我们想要的效果
15.1 默认的消息转换器
Spring 处理对象类型的消息是由 org.springframework.amap.support.converter.MessageConverter
接口来处理的,该接口默认实现是 SimpleMessageConverter
SimpleMessageConverter
类是基于 JDK 提供的 ObjectOutputStream
来类完成序列化的,这种序列化方式存在以下问题:
- 使用 JDK 序列化有安全风险(如果序列化后的消息被恶意篡改,在反序列化的过程中可能会执行一些高危的代码)
- 经过 JDK 序列化的消息占用空间很大
- 经过 JDK 序列化的消息可读性很差
15.2 自定义消息转换器
一般建议采用 JSON 序列化代替默认的 JDK 序列化
要使用 JSON 序列化,需要先引入 jackson 依赖(在项目的父工程中引入)
如果是 Web 项目,无需引入该依赖,因为 spring-boot-starter-web 依赖中已包含该依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>
接着在 consumer 服务和 publisher 服务中配置 MessageConverter
@Bean
public MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();
}
再次发送对象类型的消息,可以看到消息已经成功转换成 JSON 类型的字符串
我们也可以在 consumer 服务的 RabbitMQListener 类中添加对 object.queue 队列的监听(用什么类型发,就用什么类型接收)
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> hashMap) {System.out.println("消费者收到了 object.queue的消息:【" + hashMap + "】");
}
启动 consumer 服务的启动类之后,在控制台中可以看到被转换成 JSON 格式的消息
在控制台中会看到报错信息,因为之前有一条用 JDK 序列化的消息,现在改用了 jackson 序列化,序列化和反序列化用的序列化器不一样,肯定会报错
报错后,消息就没了,出现了消息丢失的现象,该怎么解决呢,可以参考我的另一篇博文:RabbitMQ 高级篇
相关文章:
RabbitMQ快速入门(MQ的概念、安装RabbitMQ、在 SpringBoot 项目中集成 RabbitMQ )
文章目录 1. 补充知识:同步通讯和异步通讯1.1 同步通讯1.2 异步通讯 2. 同步调用的缺点2.1 业务耦合2.2 性能较差2.3 级联失败 3. 什么情况下使用同步调用4. 异步调用5. 异步调用的优点和缺点5.1 异步调用的优点5.1.1 解除耦合,拓展性强5.1.2 无需等待&a…...
Linux文件与目录管理命令 ls cp rm mv使用方法
Linux文件与目录的管理基本上包括:显示属性、复制、删除、移动文件与目录等,由于文件与目录的管理不仅重要而且操作频繁,所以本文列举一些常用的管理命令。 如需了解路径的概念及目录的基本操作,可参考【Linux】路径的概念及目录的…...
KubeSphere 部署的 Kubernetes 集群使用 GlusterFS 存储实战入门
转载:KubeSphere 部署的 Kubernetes 集群使用 GlusterFS 存储实战入门 知识点 定级:入门级 GlusterFS 和 Heketi 简介 GlusterFS 安装部署 Heketi 安装部署 Kubernetes 命令行对接 GlusterFS 实战服务器配置(架构1:1复刻小规模生产环境,…...
elasticsearch源码分析-08Serch查询流程
Serch查询流程 查询请求Rest路由注册也是在actionModule中 //查询操作 registerHandler.accept(new RestSearchAction());Override public List<Route> routes() {return unmodifiableList(asList(new Route(GET, "/_search"),new Route(POST, "/_searc…...
【协作提效 Go - gin ! swagger】
什么是swagger Swagger 是一个用于设计、构建、记录和使用 RESTful Web 服务的工具集。它的主要作用包括: API 文档生成:Swagger 可以自动生成详细的 API 文档,包括每个端点的请求和响应格式、参数、状态码等。这使得开发者和用户可以轻松理…...
栈和队列——3.滑动窗口最大值
力扣题目链接 给定一个数组 nums,有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回滑动窗口中的最大值。 示例: 输入:nums[1,3,-1,-3,5,3,6,7],k 3 …...
嵌入式智能手表开发系列文章之开篇
不好意思,朋友们,我回来了。想想已经断更了好久了。在这段断更的日子里。开拓了个新领域,不搞android 产品,而是去搞嵌入式智能手表啦。 接下来我会用几篇文章来介绍下我对这个领域的看法体会,以及我自己所负责领域的…...
24.8.2数据结构|双链表
双链表 1、定义结构:2个指针域、数据域 2、初始化:创建一个含有N个结点的带头结点双链表head (双链表头结点的前驱与和尾节点的后继与置为空) 3、求表长:返回双链表head的长度 4、取元素:取出双链表head中…...
RabbitMQ高级特性 - 事务消息
文章目录 RabbitMQ 事务消息概述实现原理代码实现不采用事务采用事务 RabbitMQ 事务消息 概述 RabbitMQ 的 AMQP 协议实现了事务机制,允许开发者保证消息的发送和接收时原子性的,也就是说,要么消息全都发送成功,要么全都发送失败…...
leetcode:心算挑战
题目: 心算项目的挑战比赛中,要求选手从N张卡牌中选出cnt张卡牌,若这cnt张卡牌数字总和为偶数,则选手成绩「有效」且得分为cnt张卡牌数字总和。给定数组cards和cnt,其中cards[i]表示第i张卡牌上的数字。 请帮参赛选手计…...
docker部署java项目(war包方式)
场景描述:java项目war包,在开发开电脑上使用dockerfile构建镜像,上传镜像到客户服务器中使用docker加载docker镜像,然后部署。 目录 一、本地环境安装 docker git 二、服务器环境安装 docker 三、构建docker镜像(win系统) 四、注意事项 (1)系统架构 (2)使…...
jsp 自定义taglib
一、简介 我们在javaWeb开发中,经常会用到jsp的taglib标签,有时候并不能满足我们的实际需要,这就需要我们自定义taglib标签, 二、开发步骤 1、编写control方法,继承BodyTagSupport 2、定义zdytaglib.tld标签文件 3、…...
从一到无穷大 #32 TimeCloth,云上的快速 Point-in-Time Recovery
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。 文章目录 引言解决方案FAST FINE-GRAINED PITRLog FilterInter-Record Dependency ResolutionL…...
时间序列论文1——Forecasting at Scale
目录 0. AI总结0.1 文章概述0.2 研究背景0.3 研究思路0.4 研究结论与讨论1. Introduction2 Features of Business Time Series3 The Prophet Forecasting Model3.1 The Trend Model3.2 Seasonality3.3 Holidays and Events3.4 Model Fitting3.5 Analyst-in-the-Loop Modeling4 …...
HDFS常用命令
HDFS常用命令 1.HDFS命令介绍1.1基本语法格式1.2常用命令 1.HDFS命令介绍 HDFS 提供了一组命令行工具,用于管理和操作 HDFS 文件系统。 1.1基本语法格式 hdfs dfs -<命令> [选项] <参数>1.2常用命令 1.显示<path>指定的文件的详细信息。 had…...
请问如何做好软件测试工作呢?
一、明确测试目标和范围 理解测试目的:在开始测试之前,首先要明确测试的目标和范围,确保测试计划 与需求相匹配。这有助于测试人员聚焦在关键功能上,避免浪费时间和资源。制定详细的测试计划:根据项目需求࿰…...
单片机开发与Linux开发的区别
引言 单片机(MCU)和Linux开发是嵌入式系统领域的两大主要方向。它们在硬件平台、开发环境、应用场景和开发难度上存在显著区别。本文将系统性地比较单片机开发和Linux开发,探讨它们的主要区别及各自的应用场景和难度体系。 一、基本概念 1…...
【机器学习】回归类算法-相关性分析
一、前言 前面的几篇博客我们学习了分类算法,今天我们来了解一下回归类的算法吧。首先我们来谈谈两者有什么区别,首先是我们在之前的分类算法,这类算法可以将让我们学会如何将不同的数据划分到不同的类里面,输出的是一些离散的值。…...
java基础 之 集合与栈的使用(三)
文章目录 Map接口(一)实现类:HashMap特点HashMap集合的一些方法 (二)实现类: TreeMap特点【自然排序】代码【定制排序】代码TreeMap集合的一些方法 HashMap 和 TreeMap的区别 前文回顾: 戳这里 …...
JDK-java.nio包详解
JDK-java.nio包详解 概述 一直以来Java三件套(集合、io、多线程)都是最热门的Java基础技术点,我们要深入掌握好这三件套才能在日常开发中得心应手,之前有编写集合相关的文章,这里出一篇文章来梳理一下io相关的知识点。…...
虚拟机与服务器的区别是什么?虚拟机与服务器的区别和联系
服务器和虚拟机是两个不同的概念,它们在计算机领域有着不同的含义和作用。今天飞飞就和你分享虚拟机和服务器的区别和联系,希望可以帮助到你~ 1、物理形态 a)服务器是实实在在的物理设备,拥有独立的硬件架构。如CPU、硬盘、内存等 b)虚拟机…...
Linux CentOS stream9 命令
初学linux,对字符界面的命令并不陌生。问到什么是linux命令直接答cd、pwd、ls是linux命令。对于命令的定义并熟悉,也不太关心命令的底层执行逻辑,更关心录入命令,马上获取需要的结果。 本文就命令的定义、分类或执行优先级作一简单介绍。 一、定义 搜索网上对linux命令的…...
JavaScript基础——JavaScript变量声明
变量是存储数据的容器,可以变的量,值可以改变,在JavaScript中,变量声明的关键字有var、let,其中,var是ES5的语法,let是ES6的语法,变量需要先声明,在使用。 声明一个age变…...
ModuleNotFoundError: No Module Named openai
题意:Python 无法在环境中找到名为 openai 的模块 问题背景: import requests from bs4 import BeautifulSoup import openai #write each line of nuclear.txt to a list with open(nuclear.txt, r) as f:lines f.readlines()#remove the newline cha…...
基于SpringBoot+Vue的校园便利平台(带1w+文档)
基于SpringBootVue的校园便利平台(带1w文档) 基于SpringBootVue的校园便利平台(带1w文档) 本平台采用B/S架构、采用的数据库是MySQL,使用JAVA技术开发。该平台的开发方式无论在国内还是国外都比较常见,而且开发完成后使用普遍,可以给平台用户…...
串口应用编程-I.MX6U嵌入式Linux C应用编程学习笔记基于正点原子阿尔法开发板
串口应用编程 串口应用编程介绍 介绍 串口定义:串行接口,数据按顺序传输 串口特点:通信线路简单,距离远,速度较低 应用领域:常用工业接口 Linux系统中的作用 作为标准输入输出设备 系统打印信息输出 用户与系统交互 串口与终端:在Linux系统中,串口被视为一种终端&#…...
Canvas实现截图
<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>canvas实现截图功能</title><style>.ca…...
Python高性能计算:进程、线程、协程、并发、并行、同步、异步
这里写目录标题 进程、线程、协程并发、并行同步、异步I/O密集型任务、CPU密集型任务 进程、线程、协程 进程、线程和协程是计算机程序执行的三种不同方式,它们在资源管理、执行模型和调度机制上有显著的区别。以下是对它们的详细解释和比较: 进程&…...
kafka基本操作
Kafka详解 一、Kafka概述 Kafka是一个开源的分布式事件流平台,它主要用于高性能数据管道、流分析、数据集成和关键任务应用。Kafka最初被设计为一个分布式的基于发布/订阅模式的消息队列,但随着时间的推移,它已发展成为一个功能强大的流处理…...
JavaFX布局-Accordion
JavaFX布局-Accordion 一个可扩展的面板,包括标题、内容与TitledPane配合一起使用 public static Accordion demo1() {// 创建AccordionAccordion accordion new Accordion();// 内边距accordion.setPadding(new Insets(10, 10, 10, 10));for (int i 1; i < 1…...
帮赌博网站做推广被抓会判刑吗/网站历史权重查询
小车的硬件组装已经基本完成,软件调了一天也成功了。 遇到的问题有: 1、OLED显示初始化应该放在GPIO初始化和中断初始化后面,不然程序无法正常启动 2、距离显示和中断一起执行的问题,困扰了一天,最终把中断要执行的程…...
简单大方的网站/流量推广怎么做
在项目和产品的开发过程中难免会遇到使用viewpager指示器的UI实现,之前都是自己写来实现,麻烦不说代码量也比较多,之前也想过自己抽出来封装一个框架来处理,但是在后来发现MagicIndicator这个框架挺好用的,方法简单效果…...
Vs做的网站调试时如何适应网页/非国产手机浏览器
文章目录一、头文件二、操纵函数实现一、头文件 名为Ctest.h 头文件: #include<malloc.h> /* malloc()等 */#include<stdio.h> /* EOF(^Z或F6),NULL */#include<process.h> /* exit() */ /*函数结果状态码*/#define NULL 0 #define TRUE 1#def…...
国外网站托管/手机网页制作app
|前言在一般情况下请求静态资源是要用get请求,使用post请求的时候并且发送url是一个具体的资源的时候,网络会把url当做域名来解析。那如果硬是要通过POST来获取数据(客户说这样才安全),那在本地开发的时候我们就会遇到。|问题我们在开发的时候使用GET获取…...
wordpress定义/湖南疫情最新消息今天
有时候我们要向国外的客户发送邮件,邮件中就不能出现中文,如何自动设置: Step 1:单击Outlook上方的【文件】选项卡,在弹出的菜单中选择【选项】。 Step 2:点击【Outlook选项】对话框中的【高级】选项卡&…...
wordpress本地数据/拉新推广平台有哪些
我们编写的Web项目部署之后,经常会因为需要进行配置变更或功能迭代而重启服务,单纯的kill -9 pid的方式会强制关闭进程,这样就会导致服务端当前正在处理的请求失败,那有没有更优雅的方式来实现关机或重启呢?阅读本文需…...