RabbitMQ工作模式-发布订阅模式
Publish/Subscribe(发布订阅模式)
官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html
使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个消费者都可以消费完整的消息。
消息广播给所有订阅该消息的消费者。
在RabbitMQ中,生产者不是将消息直接发送给消息消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。
生产者将消息发送给交换器。交换器非常简单,从生成者接收消息,将消息推送给消息队列。交换器必须清楚的知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器的类型。
发布订阅使用fanout的交换器,创建交换器,名称为test
channel.exchangeDeclare("test","fanout");
fanout
交换器很简单,从名称就可以看出来(用风扇吹出去),将所有的收到的消息发给它的知道的所有队列。
存在一个默认的交换器。
此样例使用的是临时队列,即消费都实现将自动创建此队列,当消费都退出后,此队列也将自动删除。
队列名称如
amq.gen-gjKBgQ9PSmoj2YQGMOdPfA
样例代码
消费者1的代码:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class OneConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明的临时队列,名称由rabbitMQ自动生成String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("one 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
消费者2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class TwoConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 生成的临时队列String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("two 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
消费者3
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class ThirdConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 生成的临时队列String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("third 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class Product {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();try {// 声明fanout类型交换机channel.exchangeDeclare("ex.testfan", "fanout", true, false, false, null);for (int i = 0; i < 20; i++) {channel.basicPublish("ex.testfan",// 路由key"",// 属性null,// 信息("hello world fan " + i).getBytes(StandardCharsets.UTF_8));}} catch (IOException e) {throw new RuntimeException(e);} finally {channel.close();connection.close();}}
}
观察下队列的绑定的情况:
在未启动消费都队列之前:
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]#
在未启动消费者之前,只有看到几个默认的生产者。绑定的队列为空。
启动三个消费者:
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ ex.testfan │ fanout │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬────────────────────────────────┬──────────────────┬────────────────────────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue │ amq.gen-UG67rAw03FGbBupHX6o18g │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue │ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue │ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue │ │ │
└─────────────┴─────────────┴────────────────────────────────┴──────────────────┴────────────────────────────────┴───────────┘
[root@nullnull-os ~]#
当启动生产者后,可以发现已经产生了3个默认的交换机及队列的绑定关系。以及手动绑定的3个队列的关系。
启动生产者,查看消费情况:
消费者1
临时队列的名称:amq.gen-VbV63vwAn0IBzC7n6I--vQ
one 获取到的消息:hello world fan 0
one 获取到的消息:hello world fan 1
one 获取到的消息:hello world fan 2
one 获取到的消息:hello world fan 3
one 获取到的消息:hello world fan 4
one 获取到的消息:hello world fan 5
one 获取到的消息:hello world fan 6
one 获取到的消息:hello world fan 7
one 获取到的消息:hello world fan 8
one 获取到的消息:hello world fan 9
one 获取到的消息:hello world fan 10
one 获取到的消息:hello world fan 11
one 获取到的消息:hello world fan 12
one 获取到的消息:hello world fan 13
one 获取到的消息:hello world fan 14
one 获取到的消息:hello world fan 15
one 获取到的消息:hello world fan 16
one 获取到的消息:hello world fan 17
one 获取到的消息:hello world fan 18
one 获取到的消息:hello world fan 19
消费者2:
临时队列的名称:amq.gen-KadV2OsCRLb84p2k_ijuww
two 获取到的消息:hello world fan 0
two 获取到的消息:hello world fan 1
two 获取到的消息:hello world fan 2
two 获取到的消息:hello world fan 3
two 获取到的消息:hello world fan 4
two 获取到的消息:hello world fan 5
two 获取到的消息:hello world fan 6
two 获取到的消息:hello world fan 7
two 获取到的消息:hello world fan 8
two 获取到的消息:hello world fan 9
two 获取到的消息:hello world fan 10
two 获取到的消息:hello world fan 11
two 获取到的消息:hello world fan 12
two 获取到的消息:hello world fan 13
two 获取到的消息:hello world fan 14
two 获取到的消息:hello world fan 15
two 获取到的消息:hello world fan 16
two 获取到的消息:hello world fan 17
two 获取到的消息:hello world fan 18
two 获取到的消息:hello world fan 19
消息者3:
临时队列的名称:amq.gen-TcqXVnoS2mjOpfCw1o1CZw
third 获取到的消息:hello world fan 0
third 获取到的消息:hello world fan 1
third 获取到的消息:hello world fan 2
third 获取到的消息:hello world fan 3
third 获取到的消息:hello world fan 4
third 获取到的消息:hello world fan 5
third 获取到的消息:hello world fan 6
third 获取到的消息:hello world fan 7
third 获取到的消息:hello world fan 8
third 获取到的消息:hello world fan 9
third 获取到的消息:hello world fan 10
third 获取到的消息:hello world fan 11
third 获取到的消息:hello world fan 12
third 获取到的消息:hello world fan 13
third 获取到的消息:hello world fan 14
third 获取到的消息:hello world fan 15
third 获取到的消息:hello world fan 16
third 获取到的消息:hello world fan 17
third 获取到的消息:hello world fan 18
third 获取到的消息:hello world fan 19
再停止几个消费者查看队列信息
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ ex.testfan │ fanout │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]#
可以看到,当客户端退出之后,临时队列也就消失了。
相关文章:

RabbitMQ工作模式-发布订阅模式
Publish/Subscribe(发布订阅模式) 官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html 使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个…...

JDK源码解析-Object
1. Object类 所有类的基类——java.lang.Object Object 类是所有类的基类,当一个类没有直接继承某个类时,默认继承Object类Object 类属于 java.lang 包,此包下的所有类在使用时无需手动导入,系统会在程序编译期间自动导入。 思…...

pinia——添加插件——基础积累
问题:是否给pinia添加过插件?具体添加的方式是什么? 在pinia中,我们可以为仓库添加插件,通过添加插件能够扩展以下的内容: 为 store 添加新的属性 定义 store 时增加新的选项 为 store 增加新的方法 包装现…...
软件国产化之殇
今天又看到这么一个帖子讨论一款国产化软件,属实给我震撼到了。 对于国产化产品,一直主打的都是”自研“,难道是我对”自研“这个词的理解有误? 做一个产品,别人开源了,你拿过来使用,你可以说…...
SQLyog问题处理集合
sqlyog 问题处理 1. 错误号码:1049错误: 数据库命令参数参考:数据库命令地址 检查数据库是否存在检查创建的数据库名称 与 要进行连接的数据库名称是否一致; 2. 错误号码:1819错误: MySQL授予远程连接权限时出现: …...

JavaSE【继承和多态】(1)(重点:初始化、pretected封装、组合)
一、继承 继承 (inheritance) 机制 :是面向对象程序设计使代码可以复用的最重要的手段,它允许程序员在保持原有类特 性 的基础上进行扩展,增加新功能 ,这样产生新的类,称 派生类 。 继承呈现了面向对象程序设计的层次结…...

无涯教程-Android Studio函数
第1步-系统要求 您将很高兴知道您可以在以下两种操作系统之一上开始Android应用程序的开发- MicrosoftWindows10/8/7/Vista/2003(32或64位)MacOSX10.8.5或更高版本,最高10.9(小牛) GNOME或KDE桌面 第二点是,开发Android应用程序所需的所有工具都是开源的,可以从Web上下载。以…...

CentOS8安装mysql8.0.24
一、下载mysql安装包并解压 执行以下命令: # 创建mysql安装目录 mkdir /usr/local/mysql # 进入mysql安装目录 cd /usr/local/mysql/ # 下载mysql-8.0.24 wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.24-linux-glibc2.12-x86_64.tar.xz # 解压…...

Quasi-eccentricity Error Modeling and Compensation in Vision Metrology
论文:Quasi-eccentricity Error Modeling and Compensation in Vision Metrology 中文:视觉计量中准偏心误差建模与补偿 论文地址:Sci-Hub | Quasi-eccentricity error modeling and compensation in vision metrology. Measurement Scienc…...
ai智能电话机器人是人类的助手和朋友
一直以来,人工智能都是人们关注的热门话题。在以前,说到人工智能,第一想到的是“机器人”,随着人工智能的普及,AI已经渗透到我们生活的每一个角落。现在,说起人工智能,可能会想到“无人驾驶、无…...

应用TortoiseSVN的SubWCRev管理VisualStudio C#项目编译版本号
首先要安装 TortoiseSVN, 并确保TortoiseSVN的bin目录被加入到系统环境变量Path中。 1、拷贝Porperties目录下的文件AssemblyInfo.cs生成副本AssemblyInfo.template, 作为版本管理的模板文件。 2、修改模板文件中的想要管理的版本号信息 // [assembly: AssemblyVersion(&quo…...

【八股】2023秋招八股复习笔记5(计算机网络-CN)
文章目录 八股目录目录1、应用层 & HTTP一些http题HTTPS 加密原理(问过)HTTP/1.1 新特性HTTP/2.0 与 RPC(问过)GET 和 POST 比较 2、传输层 & TCPTCP三次握手 & 四次挥手(问过)为什么每次TCP 连…...

【C++】SLT——Vector详解
本片要分享的是关于STL中Vector的内容,Vector的内容于string非常相似,只要会使用string那么学习Vector时会非常流畅。 目录 1.vector介绍 2.vector的简单实用 2.1.简单的无参构造 编辑2.2.简单带参构造 2.3.迭代器区间初始化 2.4.vector的遍历 …...

企业网络安全:威胁情报解决方案
什么是威胁情报 威胁情报是网络安全的关键组成部分,可为潜在的恶意来源提供有价值的见解,这些知识可帮助组织主动识别和防止网络攻击,通过利用 STIX/TAXII 等威胁源,组织可以检测其网络中的潜在攻击,从而促进快速检测…...

为什么2G、3G、4G成功了,5G却?
你可能已经多年来一直听到关于闪电般的5G的炒作。虽然新的无线网络在美国仍然没有普及,但5G正在波士顿和西雅图到达拉斯和堪萨斯城等城市慢慢出现。随着连接速度的加快,用户的安全性和隐私保护将增加,因为无线行业试图改善3G和4G的防御。但是…...

C语言每日一练------Day(10)
本专栏为c语言练习专栏,适合刚刚学完c语言的初学者。本专栏每天会不定时更新,通过每天练习,进一步对c语言的重难点知识进行更深入的学习。 今日练习题关键字:自除数 除自身以外数组的乘积 💓博主csdn个人主页ÿ…...
发力服务业务,龙湖集团半程领跑赢在“智慧”
成立三十载,龙湖集团一直是房地产行业“特立独行”的存在。 一方面,龙湖在对外战略方面长期量入为出,从不背上过重的“包袱”。 不久前,一则消息引发市场关注:龙湖集团提前偿还17亿元债务,已基本全部还清…...

Kubernetes(七)修改 pod 网络(flannel 插件)
一、 提示 需要重启服务器 操作之前备份 k8s 中所有资源的 yaml 文件 如下是备份脚本,仅供参考 # 创建备份目录 test -d $3 || mkdir $3 # $1 命名空间 # $2 资源名称: sts deploy configMap svc 等 # $3 资源备份存放的目录名称for app in kubec…...

测试平台metersphere
metersphere可以做接口测试、UI测试、性能测试。 metersphere接口测试底层是jmeter,可以做API管理,快捷调试,接口用例管理,接口自动化场景执行一键选取用例范围,生成测试报告。 会用jmeter,metersphere会…...

论文笔记: One Fits All:Power General Time Series Analysis by Pretrained LM
1 intro 时间序列领域预训练模型/foundation 模型的研究还不是很多 主要挑战是缺乏大量的数据来训练用于时间序列分析的基础模型——>论文利用预训练的语言模型进行通用的时间序列分析 为各种时间序列任务提供了一个统一的框架 论文还调查了为什么从语言领域预训练的Transf…...

7.4.分块查找
一.分块查找的算法思想: 1.实例: 以上述图片的顺序表为例, 该顺序表的数据元素从整体来看是乱序的,但如果把这些数据元素分成一块一块的小区间, 第一个区间[0,1]索引上的数据元素都是小于等于10的, 第二…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...

React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...

前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...

【JVM】Java虚拟机(二)——垃圾回收
目录 一、如何判断对象可以回收 (一)引用计数法 (二)可达性分析算法 二、垃圾回收算法 (一)标记清除 (二)标记整理 (三)复制 (四ÿ…...

如何应对敏捷转型中的团队阻力
应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中,明确沟通敏捷转型目的尤为关键,团队成员只有清晰理解转型背后的原因和利益,才能降低对变化的…...

保姆级【快数学会Android端“动画“】+ 实现补间动画和逐帧动画!!!
目录 补间动画 1.创建资源文件夹 2.设置文件夹类型 3.创建.xml文件 4.样式设计 5.动画设置 6.动画的实现 内容拓展 7.在原基础上继续添加.xml文件 8.xml代码编写 (1)rotate_anim (2)scale_anim (3)translate_anim 9.MainActivity.java代码汇总 10.效果展示 逐帧…...
高防服务器价格高原因分析
高防服务器的价格较高,主要是由于其特殊的防御机制、硬件配置、运营维护等多方面的综合成本。以下从技术、资源和服务三个维度详细解析高防服务器昂贵的原因: 一、硬件与技术投入 大带宽需求 DDoS攻击通过占用大量带宽资源瘫痪目标服务器,因此…...