RabbitMQ工作模式-发布订阅模式
Publish/Subscribe(发布订阅模式)
官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html
使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个消费者都可以消费完整的消息。
消息广播给所有订阅该消息的消费者。
在RabbitMQ中,生产者不是将消息直接发送给消息消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。
生产者将消息发送给交换器。交换器非常简单,从生成者接收消息,将消息推送给消息队列。交换器必须清楚的知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器的类型。

发布订阅使用fanout的交换器,创建交换器,名称为test
channel.exchangeDeclare("test","fanout");
fanout交换器很简单,从名称就可以看出来(用风扇吹出去),将所有的收到的消息发给它的知道的所有队列。
存在一个默认的交换器。
此样例使用的是临时队列,即消费都实现将自动创建此队列,当消费都退出后,此队列也将自动删除。
队列名称如
amq.gen-gjKBgQ9PSmoj2YQGMOdPfA
样例代码
消费者1的代码:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class OneConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明的临时队列,名称由rabbitMQ自动生成String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("one 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
消费者2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class TwoConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 生成的临时队列String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("two 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
消费者3
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class ThirdConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 生成的临时队列String queueName = channel.queueDeclare().getQueue();System.out.println("临时队列的名称:" + queueName);// 定义交换机channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);// 消息队列与交换机的绑定channel.queueBind(queueName, "ex.testfan", "");channel.basicConsume(queueName,new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("third 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));}},new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}});}
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class Product {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();try {// 声明fanout类型交换机channel.exchangeDeclare("ex.testfan", "fanout", true, false, false, null);for (int i = 0; i < 20; i++) {channel.basicPublish("ex.testfan",// 路由key"",// 属性null,// 信息("hello world fan " + i).getBytes(StandardCharsets.UTF_8));}} catch (IOException e) {throw new RuntimeException(e);} finally {channel.close();connection.close();}}
}
观察下队列的绑定的情况:
在未启动消费都队列之前:
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]#
在未启动消费者之前,只有看到几个默认的生产者。绑定的队列为空。
启动三个消费者:
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ ex.testfan │ fanout │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬────────────────────────────────┬──────────────────┬────────────────────────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue │ amq.gen-UG67rAw03FGbBupHX6o18g │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ │ exchange │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue │ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue │ │ │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan │ exchange │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue │ │ │
└─────────────┴─────────────┴────────────────────────────────┴──────────────────┴────────────────────────────────┴───────────┘
[root@nullnull-os ~]#
当启动生产者后,可以发现已经产生了3个默认的交换机及队列的绑定关系。以及手动绑定的3个队列的关系。
启动生产者,查看消费情况:
消费者1
临时队列的名称:amq.gen-VbV63vwAn0IBzC7n6I--vQ
one 获取到的消息:hello world fan 0
one 获取到的消息:hello world fan 1
one 获取到的消息:hello world fan 2
one 获取到的消息:hello world fan 3
one 获取到的消息:hello world fan 4
one 获取到的消息:hello world fan 5
one 获取到的消息:hello world fan 6
one 获取到的消息:hello world fan 7
one 获取到的消息:hello world fan 8
one 获取到的消息:hello world fan 9
one 获取到的消息:hello world fan 10
one 获取到的消息:hello world fan 11
one 获取到的消息:hello world fan 12
one 获取到的消息:hello world fan 13
one 获取到的消息:hello world fan 14
one 获取到的消息:hello world fan 15
one 获取到的消息:hello world fan 16
one 获取到的消息:hello world fan 17
one 获取到的消息:hello world fan 18
one 获取到的消息:hello world fan 19
消费者2:
临时队列的名称:amq.gen-KadV2OsCRLb84p2k_ijuww
two 获取到的消息:hello world fan 0
two 获取到的消息:hello world fan 1
two 获取到的消息:hello world fan 2
two 获取到的消息:hello world fan 3
two 获取到的消息:hello world fan 4
two 获取到的消息:hello world fan 5
two 获取到的消息:hello world fan 6
two 获取到的消息:hello world fan 7
two 获取到的消息:hello world fan 8
two 获取到的消息:hello world fan 9
two 获取到的消息:hello world fan 10
two 获取到的消息:hello world fan 11
two 获取到的消息:hello world fan 12
two 获取到的消息:hello world fan 13
two 获取到的消息:hello world fan 14
two 获取到的消息:hello world fan 15
two 获取到的消息:hello world fan 16
two 获取到的消息:hello world fan 17
two 获取到的消息:hello world fan 18
two 获取到的消息:hello world fan 19
消息者3:
临时队列的名称:amq.gen-TcqXVnoS2mjOpfCw1o1CZw
third 获取到的消息:hello world fan 0
third 获取到的消息:hello world fan 1
third 获取到的消息:hello world fan 2
third 获取到的消息:hello world fan 3
third 获取到的消息:hello world fan 4
third 获取到的消息:hello world fan 5
third 获取到的消息:hello world fan 6
third 获取到的消息:hello world fan 7
third 获取到的消息:hello world fan 8
third 获取到的消息:hello world fan 9
third 获取到的消息:hello world fan 10
third 获取到的消息:hello world fan 11
third 获取到的消息:hello world fan 12
third 获取到的消息:hello world fan 13
third 获取到的消息:hello world fan 14
third 获取到的消息:hello world fan 15
third 获取到的消息:hello world fan 16
third 获取到的消息:hello world fan 17
third 获取到的消息:hello world fan 18
third 获取到的消息:hello world fan 19
再停止几个消费者查看队列信息
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ ex.testfan │ fanout │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]#
可以看到,当客户端退出之后,临时队列也就消失了。
相关文章:
RabbitMQ工作模式-发布订阅模式
Publish/Subscribe(发布订阅模式) 官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html 使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个…...
JDK源码解析-Object
1. Object类 所有类的基类——java.lang.Object Object 类是所有类的基类,当一个类没有直接继承某个类时,默认继承Object类Object 类属于 java.lang 包,此包下的所有类在使用时无需手动导入,系统会在程序编译期间自动导入。 思…...
pinia——添加插件——基础积累
问题:是否给pinia添加过插件?具体添加的方式是什么? 在pinia中,我们可以为仓库添加插件,通过添加插件能够扩展以下的内容: 为 store 添加新的属性 定义 store 时增加新的选项 为 store 增加新的方法 包装现…...
软件国产化之殇
今天又看到这么一个帖子讨论一款国产化软件,属实给我震撼到了。 对于国产化产品,一直主打的都是”自研“,难道是我对”自研“这个词的理解有误? 做一个产品,别人开源了,你拿过来使用,你可以说…...
SQLyog问题处理集合
sqlyog 问题处理 1. 错误号码:1049错误: 数据库命令参数参考:数据库命令地址 检查数据库是否存在检查创建的数据库名称 与 要进行连接的数据库名称是否一致; 2. 错误号码:1819错误: MySQL授予远程连接权限时出现: …...
JavaSE【继承和多态】(1)(重点:初始化、pretected封装、组合)
一、继承 继承 (inheritance) 机制 :是面向对象程序设计使代码可以复用的最重要的手段,它允许程序员在保持原有类特 性 的基础上进行扩展,增加新功能 ,这样产生新的类,称 派生类 。 继承呈现了面向对象程序设计的层次结…...
无涯教程-Android Studio函数
第1步-系统要求 您将很高兴知道您可以在以下两种操作系统之一上开始Android应用程序的开发- MicrosoftWindows10/8/7/Vista/2003(32或64位)MacOSX10.8.5或更高版本,最高10.9(小牛) GNOME或KDE桌面 第二点是,开发Android应用程序所需的所有工具都是开源的,可以从Web上下载。以…...
CentOS8安装mysql8.0.24
一、下载mysql安装包并解压 执行以下命令: # 创建mysql安装目录 mkdir /usr/local/mysql # 进入mysql安装目录 cd /usr/local/mysql/ # 下载mysql-8.0.24 wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.24-linux-glibc2.12-x86_64.tar.xz # 解压…...
Quasi-eccentricity Error Modeling and Compensation in Vision Metrology
论文:Quasi-eccentricity Error Modeling and Compensation in Vision Metrology 中文:视觉计量中准偏心误差建模与补偿 论文地址:Sci-Hub | Quasi-eccentricity error modeling and compensation in vision metrology. Measurement Scienc…...
ai智能电话机器人是人类的助手和朋友
一直以来,人工智能都是人们关注的热门话题。在以前,说到人工智能,第一想到的是“机器人”,随着人工智能的普及,AI已经渗透到我们生活的每一个角落。现在,说起人工智能,可能会想到“无人驾驶、无…...
应用TortoiseSVN的SubWCRev管理VisualStudio C#项目编译版本号
首先要安装 TortoiseSVN, 并确保TortoiseSVN的bin目录被加入到系统环境变量Path中。 1、拷贝Porperties目录下的文件AssemblyInfo.cs生成副本AssemblyInfo.template, 作为版本管理的模板文件。 2、修改模板文件中的想要管理的版本号信息 // [assembly: AssemblyVersion(&quo…...
【八股】2023秋招八股复习笔记5(计算机网络-CN)
文章目录 八股目录目录1、应用层 & HTTP一些http题HTTPS 加密原理(问过)HTTP/1.1 新特性HTTP/2.0 与 RPC(问过)GET 和 POST 比较 2、传输层 & TCPTCP三次握手 & 四次挥手(问过)为什么每次TCP 连…...
【C++】SLT——Vector详解
本片要分享的是关于STL中Vector的内容,Vector的内容于string非常相似,只要会使用string那么学习Vector时会非常流畅。 目录 1.vector介绍 2.vector的简单实用 2.1.简单的无参构造 编辑2.2.简单带参构造 2.3.迭代器区间初始化 2.4.vector的遍历 …...
企业网络安全:威胁情报解决方案
什么是威胁情报 威胁情报是网络安全的关键组成部分,可为潜在的恶意来源提供有价值的见解,这些知识可帮助组织主动识别和防止网络攻击,通过利用 STIX/TAXII 等威胁源,组织可以检测其网络中的潜在攻击,从而促进快速检测…...
为什么2G、3G、4G成功了,5G却?
你可能已经多年来一直听到关于闪电般的5G的炒作。虽然新的无线网络在美国仍然没有普及,但5G正在波士顿和西雅图到达拉斯和堪萨斯城等城市慢慢出现。随着连接速度的加快,用户的安全性和隐私保护将增加,因为无线行业试图改善3G和4G的防御。但是…...
C语言每日一练------Day(10)
本专栏为c语言练习专栏,适合刚刚学完c语言的初学者。本专栏每天会不定时更新,通过每天练习,进一步对c语言的重难点知识进行更深入的学习。 今日练习题关键字:自除数 除自身以外数组的乘积 💓博主csdn个人主页ÿ…...
发力服务业务,龙湖集团半程领跑赢在“智慧”
成立三十载,龙湖集团一直是房地产行业“特立独行”的存在。 一方面,龙湖在对外战略方面长期量入为出,从不背上过重的“包袱”。 不久前,一则消息引发市场关注:龙湖集团提前偿还17亿元债务,已基本全部还清…...
Kubernetes(七)修改 pod 网络(flannel 插件)
一、 提示 需要重启服务器 操作之前备份 k8s 中所有资源的 yaml 文件 如下是备份脚本,仅供参考 # 创建备份目录 test -d $3 || mkdir $3 # $1 命名空间 # $2 资源名称: sts deploy configMap svc 等 # $3 资源备份存放的目录名称for app in kubec…...
测试平台metersphere
metersphere可以做接口测试、UI测试、性能测试。 metersphere接口测试底层是jmeter,可以做API管理,快捷调试,接口用例管理,接口自动化场景执行一键选取用例范围,生成测试报告。 会用jmeter,metersphere会…...
论文笔记: One Fits All:Power General Time Series Analysis by Pretrained LM
1 intro 时间序列领域预训练模型/foundation 模型的研究还不是很多 主要挑战是缺乏大量的数据来训练用于时间序列分析的基础模型——>论文利用预训练的语言模型进行通用的时间序列分析 为各种时间序列任务提供了一个统一的框架 论文还调查了为什么从语言领域预训练的Transf…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
Ubuntu系统下交叉编译openssl
一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机:Ubuntu 20.04.6 LTSHost:ARM32位交叉编译器:arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
Qt Widget类解析与代码注释
#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...
零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)
本期内容并不是很难,相信大家会学的很愉快,当然对于有后端基础的朋友来说,本期内容更加容易了解,当然没有基础的也别担心,本期内容会详细解释有关内容 本期用到的软件:yakit(因为经过之前好多期…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...
