【SpringCloud学习笔记】RabbitMQ(中)
1. 交换机概述
前面《RabbitMQ上篇》我们使用SpringAMQP来演示如何用Java代码操作RabbitMQ,当时采用的是生产者直接将消息发布给队列,但是实际开发中不建议这么做,更加推荐生产者将消息发布到交换机(exchange),然后由exchange路由到队列,其架构如下所示:
可以看出,在发布-订阅模型中新增一个"交换机"角色,此后各个角色的任务如下:
- publisher:不再是将message直接转发到queue,而是将message转发给exchange
- exchange:一方面接收来自publisher生产的消息;另一方面,依据route key以及type将消息路由给绑定的不同的队列
- queue:与以前一样,暂存消息,供消费者消费,另外还需要同交换机建立绑定关系
- consumer:与以前一样,订阅queue中的消息,并进行业务处理消费消息
注意:由于我们的exchange不暂存消息,只做消息的路由,因此如果没有queue与exchange绑定或者routing key设置错误,就会导致消息丢失!!!
2. 交换机类型
RabbitMQ提供的交换机类型有如下四种:
- Fanout Exchange:扇出交换机,形象来说就是"广播交换机",会将消息路由给所有绑定的queue
- Direct Exchange:定向交换机,基于RoutingKey发给订阅的queue
- Topic Exchange: 通配符订阅,在Direct的基础上引入通配符
- Headers Exchange: 头匹配,基于MQ的消息头匹配,使用场景较少(此处不讲解)
2.1 Fanout Exchange
下面是Fanout Exchange的工作流程图:
特征:Fanout Exchange将消息路由给全部跟它绑定的queue
操作步骤:
- 在RabbitMQ控制台中新建两个队列:fanout.queue1、fanout.queue2
- 在RabbitMQ控制台中新建一个Fanout类型的Exchange:fanout.exchange
- 将fanout.exchange与fanout.queue1、fanout.queue2分别建立binding关系
- 新建两个方法用于模拟consumer,分别监听fanout.queue1以及fanout.queue2队列
/*** 订阅fanout.queue1队列* @param msg 消息*/
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {log.info("listener1 从【fanout.queue1】接收到消息:" + msg);
}/*** 订阅fanout.queue2队列* @param msg 消息*/
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {log.info("listener2 从【fanout.queue2】接收到消息:" + msg);
}
- 新建一个测试类方法,模拟将消息发布给fanout.exchange
/*** 测试FanoutExchange交换机类型*/
@Test
public void testFanoutExchange() {// 1. 定义exchange名称String exchangeName = "fanout.exchange";// 2. 定义消息体String msg = "震惊!某大学频频被曝出食堂安全问题";// 3. 发送消息rabbitTemplate.convertAndSend(exchangeName, "", msg);
}
- 观察结果
结果如上图所示:说明fanout.exchange雀氏将消息广播给了所有与之绑定的queue
2.2 Direct Exchange
特点:Direct Exchange要求在与queue建立binding关系的时候定义一个BindingKey,之后publisher生产者携带消息的同时也会指定RoutingKey,只有RoutingKey与BindingKey一致的queue才会被路由消息
工作流程如上图所示,其中queue1与exchange的Binding Key为"blue"以及"red",queue2与exchange的Binding Key为"yellow"以及"red",此时当Routing Key为"blue",Direct Exchange只会将消息路由给queue1
操作步骤:
- 在RabbitMQ控制台中新建两个队列:direct.queue1、direct.queue2
- 在RabbitMQ控制台中新建一个Direct类型的Exchange:direct.exchange
- 将direct.exchange与direct.queue1、direct.queue2分别建立binding关系,其中与queue1的binding key为"blue"与"red",与queue2的binding key为"yellow"与"red"
- 新建两个方法用于模拟consumer,分别监听direct.queue1以及direct.queue2队列
/*** 订阅direct.queue1队列* @param msg 消息*/
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {log.info("listener1 从【direct.queue1】接收到消息:" + msg);
}/*** 订阅direct.queue2队列* @param msg 消息*/
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {log.info("listener2 从【direct.queue2】接收到消息:" + msg);
}
- 新建一个测试类方法,模拟将消息发布给direct.exchange,并指定routing key为"blue"
/*** 测试DirectExchange交换机类型*/
@Test
public void testDirectExchange() {// 1. 定义交换机名称String exchangeName = "direct.exchange";// 2. 定义消息体String msg = "今日份消息只交给幸运色为blue的哦~";// 3. 发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}
- 观察结果
结果符合预期,只有direct.queue1能够接受到消息!
2.3 Topic Exchange
Topic Exchange与Direct Exchange非常类似,都可以依据BindingKey以及RoutingKey的匹配程度进而路由给特定符合条件的queue,但是Topic Exchange定义Binding Key可以为一组词,中间用"."进行分隔,并且支持使用通配符,规则如下:
#
:匹配0个或者多个词*
:匹配1个单词
例如现在queue1的BindingKey为"china.#“,而queue2的BindingKey为”#.news",而RoutingKey为"china.reports",此时可以路由给queue1,但是无法路由给queue2,如果RoutingKey为"china.news"则queue1、queue2均可以被路由
操作步骤:
- 在RabbitMQ控制台中新建两个队列:topic.queue1、topic.queue2
- 在RabbitMQ控制台中新建一个Topic类型的Exchange:topic.exchange
- 将topic.exchange与topic.queue1、topic.queue2分别建立binding关系,其中与queue1的binding key为"china.#“,与queue2的binding key为”#.news"
- 新建两个方法用于模拟consumer,分别监听topic.queue1以及topic.queue2队列
/*** 订阅topic.queue1队列* @param msg 消息*/
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {log.info("listener1 从【topic.queue1】接收到消息:" + msg);
}/*** 订阅topic.queue2队列* @param msg 消息*/
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {log.info("listener2 从【topic.queue2】接收到消息:" + msg);
}
- 新建一个测试类方法,模拟将消息发布给topic.exchange,并指定routing key为"china.news"
/*** 测试TopicExchange交换机类型*/
@Test
public void testTopicExchange() {// 1. 定义交换机名称String exchangeName = "topic.exchange";// 2. 定义消息体String msg = "中国新闻报,快来买呀!";// 3. 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", msg);
}
- 观察结果
证明通配符生效!
3. 声明队列和交换机
前面我们收发消息的过程是使用Java代码实现的,但是创建Queues以及Exchanges仍然需要我们在RabbitMQ提供的控制台实现,那么如何使用Java代码来创建Queue以及Exchange呢?
SpringAMQP API:
- 声明队列:使用
new Queue("队列名称")
创建 - 声明交换机:使用
new FanoutExchange("交换机名称")
(以FanoutExchange为例) - 声明绑定关系:使用
BindingBuilder.bind(队列对象).to(交换机对象)
构建
3.1 Fanout声明
步骤:
- 编写一个配置类,使用
@Configuration
声明 - 内部配置Queue、Exchange、Binding,并使用
@Bean
声明
@Configuration
public class FanoutConfig {/*** 声明FanoutExchange交换机* @return 返回FanoutExchange对象*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("code.fanout.exchange");}/*** 声明FanoutQueue队列* @return 返回FanoutQueue队列*/@Beanpublic Queue fanoutQueue() {return new Queue("code.fanout.queue");}/*** 声明绑定关系* @param fanoutExchange 交换机* @param fanoutQueue 队列* @return 绑定关系*/@Beanpublic Binding fanoutBinding(FanoutExchange fanoutExchange, Queue fanoutQueue) {return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}
}
3.2 Direct声明
步骤:
- 编写一个配置类,使用
@Configuration
声明 - 内部配置Queue、Exchange、Binding,并使用
@Bean
声明
@Configuration
public class DirectConfig {/*** 声明一个DirectExchange交换机* @return 返回一个DirectExchange类型对象*/@Beanpublic DirectExchange directExchange() {return new DirectExchange("code.direct.exchange");}/*** 声明一个Queue队列* @return 返回一个Queue类型对象*/@Beanpublic Queue directQueue() {return new Queue("code.direct.queue");}/*** 声明一个绑定关系* @return 返回Binding对象*/@Beanpublic Binding directBinding(DirectExchange directExchange, Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with("");}
}
3.3 基于注解声明
注解声明格式:
@Component
@Slf4j
public class AnnotateRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue("annotate.direct.queue"),key = {"blue", "red"},exchange = @Exchange(name = "annotate.direct.exchange", type = ExchangeTypes.DIRECT)))public void listenAnnotateDirect(String msg) {log.info("接收到消息:" + msg);}
}
4. 消息转换器
4.1 现象演示
前面我们都是将字符串类型的数据作为消息进行传输,那么如果是对象类型的消息呢,我们尝试发送一个自定义User类型作为消息传输:
/*** 自定义User类型* @author 米饭好好吃*/
@Data
@AllArgsConstructor
public class User implements Serializable {private String name;private Integer age;
}
@Test
public void testSendObject() {// 1. 声明队列名称String queueName = "work.queue";// 2. 定义消息体User user = new User("jack", 22);// 3. 发送消息rabbitTemplate.convertAndSend(queueName, user);
}
从RabbitMQ控制台中查看消息内容如下:
4.2 追踪源码
我们发现实际调用了convertMessageIfNecessary(object)
方法,我们继续追踪进去:
该方法判断object是否为Message类型,如果不是就调用getRequiredMessageConverter()
获取所需的消息转换器,继续追踪进去:
该方法返回了一个SimpleMessageConverter实例对象,因此我们回到上一层,获取到MessageConverter实例后又调用了toMessage
方法,我们继续追踪进去观察是如何转换消息的:
在AbstruectMessageConverter中实现了toMessage方法,而createMessage方法在子类 SimpleMessageConverter重写了该方法:
可以看出调用了SerialzationUtils.serialize(object)
进行了序列化,继续追踪观察到底是如何序列化的:
可以看出是借助ObjectOutputStream
进行序列化的,而这这个是JDK默认的序列化方式,该方式有如下缺点:
- 序列化过程不够安全,可能存在注入风险
- 序列化结果可读性较差
- 序列化结果占用体积较大
因此我们需要重写消息转换器中的序列化机制:
4.3 自定义JSON序列化器
因此JDK原生序列化器有诸多确定,因此我们需要使用自定义的JSON序列化器,此处需要引入jackson-databind
相关依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
/*** 消息转换器配置* @author 米饭好好吃*/
@Configuration
public class MessageConvertConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}
验证结果:
在控制台中我们可以发现消息格式就是熟悉的JSON格式了
相关文章:
【SpringCloud学习笔记】RabbitMQ(中)
1. 交换机概述 前面《RabbitMQ上篇》我们使用SpringAMQP来演示如何用Java代码操作RabbitMQ,当时采用的是生产者直接将消息发布给队列,但是实际开发中不建议这么做,更加推荐生产者将消息发布到交换机(exchange),然后由exchange路由…...
【C++】类和对象的引入
文章目录 前言一、类的定义二、类的访问控制与封装三、类的作用域四、类的实例化五、类的存储方式及大小计算六、隐藏的this指针 前言 C语言是面向过程的,关注的是过程,分析出求解问题的步骤,通过函数调用逐步解决问题。 C是基于面向对象的&…...
11.5.k8s中pod的调度-cordon,drain,delete
目录 一、概念 二、使用 1.cordon 停止调度 1.1.停止调度 1.2.解除恢复 2.drain 驱逐节点 2.1.驱逐节点 2.2.参数介绍 2.3.解除恢复 3.delete 删除节点 一、概念 cordon节点,drain驱逐节点,delete 节点,在对k8s集群节点执行维护&am…...
Java中线程的创建方式
一、继承Thread类,重写run方法 public class MyThread{public static void main(String[] args) {Thread threadDome new ThreadDome();threadDome.start();} }class ThreadDome extends Thread{Overridepublic void run() {for (int i 0; i < 5; i) {try {Th…...
猫头虎推荐20个值得体验的通用大模型
猫头虎推荐20个值得体验的通用大模型 🚀 大家好,我是猫头虎,一名专注于科技领域的自媒体博主。今天是周一,新的开始,我们来深入探讨一下当前最值得体验的通用大模型。这些AI模型不仅功能强大,而且在各自领…...
Novartis诺华制药社招综合能力性格动机问卷入职测评笔试题库答案及包过助攻
【华东同舟求职】由资深各行业从业者建立的一站式人才服务网络平台,现阶段目标是“提升全市场各行业岗位信息的流动性和透明度”。我们接受众多行业机构的直接委托发布,并尽力通过各种方法搜寻高价值岗位信息。事实上,我们以发现不为人知的优…...
Adam优化算法
Adam优化算法 Adam(Adaptive Moment Estimation)是一种用于训练深度学习模型的优化算法,由Diederik P. Kingma和Jimmy Ba在2014年提出。Adam结合了动量和自适应学习率的方法,具有高效、稳定和适应性强的特点,被广泛应…...
MYSQL 三、mysql基础知识 7(MySQL8其它新特性)
一、mysql8新特性概述 MySQL从5.7版本直接跳跃发布了8.0版本 ,可见这是一个令人兴奋的里程碑版本。MySQL 8版本在功能上 做了显著的改进与增强,开发者对MySQL的源代码进行了重构,最突出的一点是多MySQL Optimizer优化器进行了改进。不仅在速度…...
git error: does not have a commit checked out fatal: adding files failed
git add net error: net/ does not have a commit checked out fatal: adding files failed这个错误是因为尝试将一个尚未被提交的文件夹添加到Git中。解决这个问题的方法是先将文件夹中的文件提交到Git仓库中,然后再将文件夹添加到Git中。 首先,需要进…...
Java Websocket分片发送
一、分片发送和接收(复杂) 如果数据量太大,需要分多次发送, 需要考虑数据划分和重组的问题。 二、具体思路 每次发送和接收用一个布尔值变量指定是否为最后一个分片。 三、具体使用 (一)字符串分片发送: sendText(文本, 布尔值)…...
vivado NODE、PACKAGE_PIN
节点是Xilinx部件上用于路由连接或网络的设备对象。它是一个 WIRE集合,跨越多个瓦片,物理和电气 连接在一起。节点可以连接到单个SITE_, 而是简单地将NETs携带进、携带出或携带穿过站点。节点可以连接到 任何数量的PIP,并且也可以…...
JavaEE、SSM基础框架、JavaWeb、MVC(认识)
目录 一、引言 (0)简要介绍 (1)主要涉及的学习内容 (2)学习的必要性 (3)适用学习的人群(最好有这个部分的知识基础) (4)这个基础…...
【漏洞复现】飞企互联-FE企业运营管理平台 treeXml.jsp SQL注入漏洞
0x01 产品简介 飞企互联-FE企业运营管理平台是一个基于云计算、智能化、大数据、物联网、移动互联网等技术支撑的云工作台。这个平台可以连接人、链接端、联通内外,支持企业B2B、C2B与020等核心需求,为不同行业客户的互联网转型提供支持。其特色在于提供…...
Android基础-运行时权限
一、引言 随着智能手机和移动互联网的普及,Android操作系统作为其中的佼佼者,其安全性问题日益受到关注。为了保障用户数据的安全和隐私,Android系统引入了权限机制来管理和控制应用程序对系统资源和用户数据的访问。特别是在Android 6.0&am…...
postman断言及变量及参数化
1:postman断言 断言:判断接口是否执行成功的过程 针对接口请求完成之后,针对他的响应状态码及响应信息进行判断,代码如下: //判断响应信息状态码是否正确 pm.test("Status code is 200", function () { pm.response.…...
安装和使用TrinityCore NPCBot
安装TrinityCore NPCBot 官网:GitHub - trickerer/Trinity-Bots: NPCBots for TrinityCore and AzerothCore 3.3.5 基本安装方法 Follow TrinityCore Installation Guide (https://TrinityCore.info/) to install the server firstDownload NPCBots.patch and put …...
Hvv--知攻善防应急响应靶机--Linux2
HW–应急响应靶机–Linux2 所有靶机均来自 知攻善防实验室 靶机整理: 夸克网盘:https://pan.quark.cn/s/4b6dffd0c51a#/list/share百度云盘:https://pan.baidu.com/s/1NnrS5asrS1Pw6LUbexewuA?pwdtxmy 官方WP:https://mp.weixin.…...
replaceAll is not a function 详解
先说说原因: 在chrome 浏览器中使用 replaceAll 报这个错误,是因为chrome 版本过低, 在chrome 85 以上版本才支持 用法 replaceAll(pattern, replacement)const paragraph "I think Ruths dog is cuter than your dog!"; console…...
如何设置天锐绿盾的数据防泄密系统
设置天锐绿盾的数据防泄密系统,可以按照以下步骤进行: 一、系统安装与初始化 在线或离线安装天锐绿盾数据防泄密系统,确保以管理员身份运行安装包,并按照安装向导的提示完成安装。输入序列号进行注册,激活系统。 二…...
003 gitee怎样将默认的私有仓库变成公开仓库
先点击“管理”, 再点击“基本信息” 在“是否开源”里, 选择:开源...
Spring框架中的IOC(控制反转)详解
Spring框架中的IOC(控制反转)详解 一、引言 在软件开发中,设计模式与框架的应用极大地提高了开发效率和软件质量。其中,Spring框架因其强大的功能和灵活的扩展性,成为了Java企业级应用开发的首选。而Spring框架中的核…...
Score Matching(得分匹配)
Score Matching(得分匹配)是一种统计学习方法,用于估计概率密度函数的梯度(即得分函数),而无需知道密度函数的归一化常数。这种方法由Hyvrinen在2005年提出,主要用于无监督学习,特别…...
五大维度大比拼:ChatGPT比较文心一言,你的AI助手选择指南
文章目录 一、评估AI助手的五个关键维度二、ChatGPT和文心一言的比较 评估AI助手的五个关键维度,以及ChatGPT和文心一言的比较如下: 一、评估AI助手的五个关键维度 界面友好性 : 评估标准:用户界面是否直观易用,是否…...
大学课设项目,Windows端基于UDP的网络聊天程序的服务端和客户端
文章目录 前言项目需求介绍一、服务端1.对Udp套接字进行一个封装2. UdpServer的编写3. Task.h4.protocol.h的编写5.线程池的编写6.main.cc 二、客户端1. Socket.h2.protocol.h3.UdpClient4.menu.h5.main.cpp 三、运行图 前言 本次项目可以作为之前内容的一个扩展,学…...
【5.x】ELK日志分析、集群部署
ELK日志分析 一、ELK概述 1、ELK简介 ELK平台是一套完整的日志集中处理解决方案,将ElasticSearch、Logstash和Kiabana三个开源工具配合使用,完成更强大的用户对日志的查询、排序、统计需求。 一个完整的集中式日志系统,需要包含以下几个主…...
揭秘创业加盟:豫腾助力,发掘商机,共赢未来
在我们生活的这个充满活力与机遇的世界里,商业活动如繁星点点,照亮着每个人的创业梦想。 在这个过程中,创业加盟作为一种独特且吸引人的模式,逐渐受到广大创业者的关注。 本文将深入解析创业加盟的精髓,以及如何在其…...
Linux操作系统以及一些操作命令、安装教程
Web课程完结啦,这是Web第一天的课程大家有兴趣可以传送过去学习 http://t.csdnimg.cn/K547r Linux-Day01 课程内容 Linux简介 Linux安装 Linux常用命令 1. 前言 1.1 什么是Linux Linux是一套免费使用和自由传播的操作系统。说到操作系统,大家比…...
树莓派4B_OpenCv学习笔记6:OpenCv识别已知颜色_运用掩膜
今日继续学习树莓派4B 4G:(Raspberry Pi,简称RPi或RasPi) 本人所用树莓派4B 装载的系统与版本如下: 版本可用命令 (lsb_release -a) 查询: Opencv 版本是4.5.1: 学了这些OpenCv的理论性知识,不进行实践实在…...
ZSH 配置
ZSH 配置 1. 安装 ZSH2. 安装 oh my zsh3. 安装插件3.1 autojump3.2 zsh-autosuggestions 1. 安装 ZSH sudo apt-get install zsh 完成安装后需设置当前用户使用 zsh: chsh -s /bin/zsh 重启后即可使用 2. 安装 oh my zsh 安装 oh my zsh 需先安装 git。 自动安装…...
LogicFlow 学习笔记——5. LogicFlow 基础 主题 Theme
主题 Theme LogicFlow 提供了设置主题的方法,便于用户统一设置其内部所有元素的样式。设置方式有两种: 初始化LogicFlow时作为配置传入初始化后,调用LogicFlow的 setTheme 方法 主题配置参数见主题API 配置 new LogicFlow 时作为将主题配…...
wordpress主题的css在什么位置/税收大数据
1. 检查sql文件路径是否正确;2. 将sql文件移动到C:/Windows/Temp目录下,再打开试试。 转载于:https://www.cnblogs.com/Youyou-2608/p/8994202.html...
discuz网站建设教学视频教程/百度云资源搜索入口
<本文的原始位置: http://bluegene8210.is-programmer.com/posts/21815.html> 准备给程序添加数据库组件。因为该死的 MySQLdb 模块还不支持 Python 3, 只能暂时用土办法,通过 subprocess 模块连上 MySQL 服务器,然后用 stdin/stdout 做交流。基本…...
毕业设计代做网站价格/深圳关键词排名seo
今日执行项目更新时,手贱点击了cancel 中断了操作,最后导致项目被锁,杯具了。 首先想到了Clean up 直接提示 看来不行呀 …// 省略 n 多种尝试 最后使用删除db 中的 lock 表来解决了这个问题 首先cd 到项目下然后执行如下操作 c:\> sqlite3 .svn/…...
vps设置网站访问用户权限/网络广告投放网站
为什么80%的码农都做不了架构师?>>> 首先登录oracle,进入jdk8下载页面: http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html 找到需要下载得版本,并获取下载链接地址:…...
南昌模板建站定制/seo方案
从Npm Script到Webpack,6种常见的前端构建工具对比 小编说:历史上先后出现了一系列构建工具,它们各有优缺点。由于前端工程师很熟悉JavaScript,Node.js又可以胜任所有构建需求,所以大多数构建工具都是用Node.js开发的。…...
福建嘉瑞建设工程有限公司网站/青岛百度seo排名
开发的时候,写了个很简单的Sql ,大概就是 总数除以数量 得出的平均值。看起来很平常是不是!简单来说就是 Total / Count 嘛!最多转个2位小数用Convert就完事了对不对。 但是呢,有些数据的Count值本身是就是0的。然后就…...