RabbitMQ工作模式-发布订阅模式
Publish/Subscribe(发布订阅模式)
官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html
使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个消费者都可以消费完整的消息。
消息广播给所有订阅该消息的消费者。
在RabbitMQ中,生产者不是将消息直接发送给消息消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。
生产者将消息发送给交换器。交换器非常简单,从生成者接收消息,将消息推送给消息队列。交换器必须清楚的知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器的类型。

发布订阅使用fanout的交换器,创建交换器,名称为test
channel.exchangeDeclare("test","fanout");
fanout交换器很简单,从名称就可以看出来(用风扇吹出去),将所有的收到的消息发给它的知道的所有队列。
存在一个默认的交换器。
此样例使用的是临时队列,即消费都实现将自动创建此队列,当消费都退出后,此队列也将自动删除。
队列名称如
amq.gen-gjKBgQ9PSmoj2YQGMOdPfA
样例代码
消费者1的代码:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class OneConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明的临时队列,名称由rabbitMQ自动生成String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("one 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
消费者2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class TwoConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 生成的临时队列String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("two 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
消费者3
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class ThirdConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 生成的临时队列String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("third 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class Product {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();try {// 声明fanout类型交换机channel.exchangeDeclare("ex.testfan", "fanout", true, false, false, null);for (int i = 0; i < 20; i++) {channel.basicPublish("ex.testfan",// 路由key"",// 属性null,// 信息("hello world fan " + i).getBytes(StandardCharsets.UTF_8));}} catch (IOException e) {throw new RuntimeException(e);} finally {channel.close();connection.close();}}
}
观察下队列的绑定的情况:
在未启动消费都队列之前:
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]#
在未启动消费者之前,只有看到几个默认的生产者。绑定的队列为空。
启动三个消费者:
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ ex.testfan │ fanout │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬────────────────────────────────┬──────────────────┬────────────────────────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue │ amq.gen-UG67rAw03FGbBupHX6o18g │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue │ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue │ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue │ │ │
└─────────────┴─────────────┴────────────────────────────────┴──────────────────┴────────────────────────────────┴───────────┘
[root@nullnull-os ~]#
当启动生产者后,可以发现已经产生了3个默认的交换机及队列的绑定关系。以及手动绑定的3个队列的关系。
启动生产者,查看消费情况:
消费者1
临时队列的名称:amq.gen-VbV63vwAn0IBzC7n6I--vQ
one 获取到的消息:hello world fan 0
one 获取到的消息:hello world fan 1
one 获取到的消息:hello world fan 2
one 获取到的消息:hello world fan 3
one 获取到的消息:hello world fan 4
one 获取到的消息:hello world fan 5
one 获取到的消息:hello world fan 6
one 获取到的消息:hello world fan 7
one 获取到的消息:hello world fan 8
one 获取到的消息:hello world fan 9
one 获取到的消息:hello world fan 10
one 获取到的消息:hello world fan 11
one 获取到的消息:hello world fan 12
one 获取到的消息:hello world fan 13
one 获取到的消息:hello world fan 14
one 获取到的消息:hello world fan 15
one 获取到的消息:hello world fan 16
one 获取到的消息:hello world fan 17
one 获取到的消息:hello world fan 18
one 获取到的消息:hello world fan 19
消费者2:
临时队列的名称:amq.gen-KadV2OsCRLb84p2k_ijuww
two 获取到的消息:hello world fan 0
two 获取到的消息:hello world fan 1
two 获取到的消息:hello world fan 2
two 获取到的消息:hello world fan 3
two 获取到的消息:hello world fan 4
two 获取到的消息:hello world fan 5
two 获取到的消息:hello world fan 6
two 获取到的消息:hello world fan 7
two 获取到的消息:hello world fan 8
two 获取到的消息:hello world fan 9
two 获取到的消息:hello world fan 10
two 获取到的消息:hello world fan 11
two 获取到的消息:hello world fan 12
two 获取到的消息:hello world fan 13
two 获取到的消息:hello world fan 14
two 获取到的消息:hello world fan 15
two 获取到的消息:hello world fan 16
two 获取到的消息:hello world fan 17
two 获取到的消息:hello world fan 18
two 获取到的消息:hello world fan 19
消息者3:
临时队列的名称:amq.gen-TcqXVnoS2mjOpfCw1o1CZw
third 获取到的消息:hello world fan 0
third 获取到的消息:hello world fan 1
third 获取到的消息:hello world fan 2
third 获取到的消息:hello world fan 3
third 获取到的消息:hello world fan 4
third 获取到的消息:hello world fan 5
third 获取到的消息:hello world fan 6
third 获取到的消息:hello world fan 7
third 获取到的消息:hello world fan 8
third 获取到的消息:hello world fan 9
third 获取到的消息:hello world fan 10
third 获取到的消息:hello world fan 11
third 获取到的消息:hello world fan 12
third 获取到的消息:hello world fan 13
third 获取到的消息:hello world fan 14
third 获取到的消息:hello world fan 15
third 获取到的消息:hello world fan 16
third 获取到的消息:hello world fan 17
third 获取到的消息:hello world fan 18
third 获取到的消息:hello world fan 19
再停止几个消费者查看队列信息
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ ex.testfan │ fanout │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]#
可以看到,当客户端退出之后,临时队列也就消失了。
相关文章:
RabbitMQ工作模式-发布订阅模式
Publish/Subscribe(发布订阅模式) 官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html 使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个…...
JDK源码解析-Object
1. Object类 所有类的基类——java.lang.Object Object 类是所有类的基类,当一个类没有直接继承某个类时,默认继承Object类Object 类属于 java.lang 包,此包下的所有类在使用时无需手动导入,系统会在程序编译期间自动导入。 思…...
pinia——添加插件——基础积累
问题:是否给pinia添加过插件?具体添加的方式是什么? 在pinia中,我们可以为仓库添加插件,通过添加插件能够扩展以下的内容: 为 store 添加新的属性 定义 store 时增加新的选项 为 store 增加新的方法 包装现…...
软件国产化之殇
今天又看到这么一个帖子讨论一款国产化软件,属实给我震撼到了。 对于国产化产品,一直主打的都是”自研“,难道是我对”自研“这个词的理解有误? 做一个产品,别人开源了,你拿过来使用,你可以说…...
SQLyog问题处理集合
sqlyog 问题处理 1. 错误号码:1049错误: 数据库命令参数参考:数据库命令地址 检查数据库是否存在检查创建的数据库名称 与 要进行连接的数据库名称是否一致; 2. 错误号码:1819错误: MySQL授予远程连接权限时出现: …...
JavaSE【继承和多态】(1)(重点:初始化、pretected封装、组合)
一、继承 继承 (inheritance) 机制 :是面向对象程序设计使代码可以复用的最重要的手段,它允许程序员在保持原有类特 性 的基础上进行扩展,增加新功能 ,这样产生新的类,称 派生类 。 继承呈现了面向对象程序设计的层次结…...
无涯教程-Android Studio函数
第1步-系统要求 您将很高兴知道您可以在以下两种操作系统之一上开始Android应用程序的开发- MicrosoftWindows10/8/7/Vista/2003(32或64位)MacOSX10.8.5或更高版本,最高10.9(小牛) GNOME或KDE桌面 第二点是,开发Android应用程序所需的所有工具都是开源的,可以从Web上下载。以…...
CentOS8安装mysql8.0.24
一、下载mysql安装包并解压 执行以下命令: # 创建mysql安装目录 mkdir /usr/local/mysql # 进入mysql安装目录 cd /usr/local/mysql/ # 下载mysql-8.0.24 wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.24-linux-glibc2.12-x86_64.tar.xz # 解压…...
Quasi-eccentricity Error Modeling and Compensation in Vision Metrology
论文:Quasi-eccentricity Error Modeling and Compensation in Vision Metrology 中文:视觉计量中准偏心误差建模与补偿 论文地址:Sci-Hub | Quasi-eccentricity error modeling and compensation in vision metrology. Measurement Scienc…...
ai智能电话机器人是人类的助手和朋友
一直以来,人工智能都是人们关注的热门话题。在以前,说到人工智能,第一想到的是“机器人”,随着人工智能的普及,AI已经渗透到我们生活的每一个角落。现在,说起人工智能,可能会想到“无人驾驶、无…...
应用TortoiseSVN的SubWCRev管理VisualStudio C#项目编译版本号
首先要安装 TortoiseSVN, 并确保TortoiseSVN的bin目录被加入到系统环境变量Path中。 1、拷贝Porperties目录下的文件AssemblyInfo.cs生成副本AssemblyInfo.template, 作为版本管理的模板文件。 2、修改模板文件中的想要管理的版本号信息 // [assembly: AssemblyVersion(&quo…...
【八股】2023秋招八股复习笔记5(计算机网络-CN)
文章目录 八股目录目录1、应用层 & HTTP一些http题HTTPS 加密原理(问过)HTTP/1.1 新特性HTTP/2.0 与 RPC(问过)GET 和 POST 比较 2、传输层 & TCPTCP三次握手 & 四次挥手(问过)为什么每次TCP 连…...
【C++】SLT——Vector详解
本片要分享的是关于STL中Vector的内容,Vector的内容于string非常相似,只要会使用string那么学习Vector时会非常流畅。 目录 1.vector介绍 2.vector的简单实用 2.1.简单的无参构造 编辑2.2.简单带参构造 2.3.迭代器区间初始化 2.4.vector的遍历 …...
企业网络安全:威胁情报解决方案
什么是威胁情报 威胁情报是网络安全的关键组成部分,可为潜在的恶意来源提供有价值的见解,这些知识可帮助组织主动识别和防止网络攻击,通过利用 STIX/TAXII 等威胁源,组织可以检测其网络中的潜在攻击,从而促进快速检测…...
为什么2G、3G、4G成功了,5G却?
你可能已经多年来一直听到关于闪电般的5G的炒作。虽然新的无线网络在美国仍然没有普及,但5G正在波士顿和西雅图到达拉斯和堪萨斯城等城市慢慢出现。随着连接速度的加快,用户的安全性和隐私保护将增加,因为无线行业试图改善3G和4G的防御。但是…...
C语言每日一练------Day(10)
本专栏为c语言练习专栏,适合刚刚学完c语言的初学者。本专栏每天会不定时更新,通过每天练习,进一步对c语言的重难点知识进行更深入的学习。 今日练习题关键字:自除数 除自身以外数组的乘积 💓博主csdn个人主页ÿ…...
发力服务业务,龙湖集团半程领跑赢在“智慧”
成立三十载,龙湖集团一直是房地产行业“特立独行”的存在。 一方面,龙湖在对外战略方面长期量入为出,从不背上过重的“包袱”。 不久前,一则消息引发市场关注:龙湖集团提前偿还17亿元债务,已基本全部还清…...
Kubernetes(七)修改 pod 网络(flannel 插件)
一、 提示 需要重启服务器 操作之前备份 k8s 中所有资源的 yaml 文件 如下是备份脚本,仅供参考 # 创建备份目录 test -d $3 || mkdir $3 # $1 命名空间 # $2 资源名称: sts deploy configMap svc 等 # $3 资源备份存放的目录名称for app in kubec…...
测试平台metersphere
metersphere可以做接口测试、UI测试、性能测试。 metersphere接口测试底层是jmeter,可以做API管理,快捷调试,接口用例管理,接口自动化场景执行一键选取用例范围,生成测试报告。 会用jmeter,metersphere会…...
论文笔记: One Fits All:Power General Time Series Analysis by Pretrained LM
1 intro 时间序列领域预训练模型/foundation 模型的研究还不是很多 主要挑战是缺乏大量的数据来训练用于时间序列分析的基础模型——>论文利用预训练的语言模型进行通用的时间序列分析 为各种时间序列任务提供了一个统一的框架 论文还调查了为什么从语言领域预训练的Transf…...
spring:实例工厂方法获取bean
spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂ÿ…...
Qt Http Server模块功能及架构
Qt Http Server 是 Qt 6.0 中引入的一个新模块,它提供了一个轻量级的 HTTP 服务器实现,主要用于构建基于 HTTP 的应用程序和服务。 功能介绍: 主要功能 HTTP服务器功能: 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
C++中string流知识详解和示例
一、概览与类体系 C 提供三种基于内存字符串的流,定义在 <sstream> 中: std::istringstream:输入流,从已有字符串中读取并解析。std::ostringstream:输出流,向内部缓冲区写入内容,最终取…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...
《基于Apache Flink的流处理》笔记
思维导图 1-3 章 4-7章 8-11 章 参考资料 源码: https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...
C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...
论文笔记——相干体技术在裂缝预测中的应用研究
目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术:基于互相关的相干体技术(Correlation)第二代相干体技术:基于相似的相干体技术(Semblance)基于多道相似的相干体…...
HubSpot推出与ChatGPT的深度集成引发兴奋与担忧
上周三,HubSpot宣布已构建与ChatGPT的深度集成,这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋,但同时也存在一些关于数据安全的担忧。 许多网络声音声称,这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...
