202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型
目录
- ★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型
- 代码演示:
- 生产者:producer
- 消费者:Consumer01
- 消费者:Consumer02
- 测试结果
- 完整代码
- ConnectionUtil
- Publisher
- Consumer01
- Consumer02
- pom.xml
★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型
就是声明一个 fanout 类型的 Exchange 来分发消息。消费者进行消费
fanout 类型就是广播模式。
fanout 类型 的 Exchange 不会判断消息的路由key,直接将消息分发给绑定到该Exchange的所有队列。
生产者发送一条消息到fanout类型的Exchange后,绑定到该Exchange的所有队列都会收到该消息的一条副本,
而消费者也能分别从不同的队列中读取消息,互不干扰。
▲ fanout类型的Exchange可以很好地模拟JMS的Pub-Sub消息模型。

代码演示:
都是在前面一篇的代码基础上修改的。
需求:使用 fanout 类型的Exchange ,实行发布-订阅的功能,其实就是创建一个生产者和两个消费者,实现广播模式的消息分发。

生产者:producer
在生产者中声明Exchange ,然后声明两个消息队列 Queue,
然后给这个Exchange 绑定 这个两个Queue


消费者:Consumer01
两个消费者的代码没啥区别,
消费方法的参数 autoAck 都是true, 都是自动确认消费。
两个消费者各自消费自己指定的消息队列。


消费者:Consumer02


测试结果
消费生产者发送10条消息,两个消费者都能各自消费到10条消息就是正确的。
消息生产者使用fanout这个广播的类型发送消息。

两个消费者都能消费到10条消息,正确。

完整代码
ConnectionUtil
package cn.ljh.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory = new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}
Publisher
package cn.ljh.rabbitmq.producer;import cn.ljh.rabbitmq.consumer.Consumer01;
import cn.ljh.rabbitmq.consumer.Consumer02;
import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//消息生产者--使用fanout类型的exchange------就是广播模式
public class Publisher
{//常量:定义个Exchange的名字作为常量public static final String EXCHANGE_NAME = "myex01.fanout";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定channel.exchangeDeclare(EXCHANGE_NAME,/* Exchange名字 */BuiltinExchangeType.FANOUT,/* Exchange 类型 */true,/* 是否持久化 */false,/* 是否自动栅除 */false,/* 是否为内部的 Exchange */null /* 指定 Exchange 的额外属性 */);//声明多个消息队列------声明第1个消息队列channel.queueDeclare(Consumer01.QUEUE01, true, false, false, null);//把 Exchange 和 Queue 绑定起来,绑定第一个消息队列channel.queueBind(Consumer01.QUEUE01,EXCHANGE_NAME,"" /* 因为Exchange 是fanout类型,所以无需 路由key */,null /* 指定 Exchange 的额外属性 */);//声明第2个消息队列channel.queueDeclare(Consumer02.QUEUE02, true, false, false, null);//把 Exchange 和 Queue 绑定起来,绑定第2个消息队列channel.queueBind(Consumer02.QUEUE02,EXCHANGE_NAME,"" /* 因为Exchange 是fanout类型,所以无需 路由key */,null /* 指定 Exchange 的额外属性 */);//生产者发送10条消息for (int i = 1; i <= 10; i++){String message = "生产者发送的第【 " + i + " 】条消息的内容";//4、调用Channel的basicPublish()方法发送消息channel.basicPublish(EXCHANGE_NAME /* 向这个 fanout类型的 Exchange 发送消息 */,"" /* 因为 Exchange 是fanout 类型,所以有没有路由key都无所谓 */,null /*指定额外的消息的属性*/,message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/);System.out.println("生产者发送【 "+i+" 】条消息完成");}//5、关闭资源//关闭通道channel.close();//关闭连接conn.close();}
}
Consumer01
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者1
public class Consumer01
{// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下://(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。//(2)通过Connection获取Channel。//(3)根据需要、调用Channel的queueDeclare()方法声明队列, Declare:声明、宣布// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//常量public final static String QUEUE01 = "firstQueue";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(QUEUE01, /* 声明的队列名 */true, /* 消息队列是否持久化 */false, /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(QUEUE01 /*消费这个消费队列里面的消息*/,true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数 %s:输出字符串 %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}
}
Consumer02
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者2
public class Consumer02
{// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下://(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。//(2)通过Connection获取Channel。//(3)根据需要、调用Channel的queueDeclare()方法声明队列, Declare:声明、宣布// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//常量public final static String QUEUE02 = "secondQueue";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,// 如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(QUEUE02, /* 声明的队列名 */true, /* 消息队列是否持久化 */false, /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(QUEUE02 /*消费这个名字的消费队列里面的消息*/,true/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数 %s:输出字符串 %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>rabbitmq_fanout</artifactId><version>1.0.0</version><name>rabbitmq_fanout</name><!-- 属性 --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!-- 依赖 --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency></dependencies></project>
相关文章:
202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型
目录 ★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型代码演示:生产者:producer消费者:Consumer01消费者:Consumer02测试结果 完整代码ConnectionUtilPublisherConsumer01Consumer02pom.xml ★ 使用 fanout 类型的Exchange …...
web 性能优化详解(Lighthouse工具、优化方式、强缓存和协商缓存、代码优化、算法优化)
1.性能优化包含的方面 优化性能概念宽泛,可以从信号、系统、计算机原理、操作系统、网络通信、DNS解析、负载均衡、页面渲染。只要结合一个实际例子讲述清楚即可。 2.什么是性能? Web 性能是客观的衡量标准,是用户对加载时间和运行时的直观…...
docker-compose部署elk(8.9.0)并开启ssl认证
docker部署elk并开启ssl认证 docker-compose部署elk部署所需yml文件 —— docker-compose-elk.yml部署配置elasticsearch和kibana并开启ssl配置基础数据认证配置elasticsearch和kibana开启https访问 配置logstash创建springboot项目进行测试kibana创建视图,查询日志…...
解决java.lang.IllegalArgumentException: servlet映射中的<url pattern>[demo1]无效
当我使用tomcat启动使用servlet项目时,出现了报错: java.lang.IllegalArgumentException: servlet映射中的<url pattern>[demo1]无效 显示路径错误,于是去检查Web.xml中的配置,发现是配置文件的路径写错了,少写了…...
软件测试学习(三)易用性测试、测试文档、软件安全性测试、网站测试
目录 易用性测试 用户界面测试 优秀Ul由什么构成 符合标准和规范 直观 一致 灵活 舒适 正确 实用 为有残疾障碍的人员测试:辅助选项测试 测试文档 软件文档的类型 文档测试的重要性 软件安全性测试 了解黑客的动机 威胁模式分析 网站测试 网页基…...
Java中,对象一定在堆中分配吗?
在我们的日常编程实践中,我们经常会遇到各种类型的对象,比如字符串、列表、自定义类等等。这些对象在内存中是如何存储的呢? 你可能会毫不犹豫地回答:“在堆中!”如果你这样回答了,那你大部分情况下是正确…...
AI:38-基于深度学习的抽烟行为检测
🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌本专栏包含以下学习方向: 机器学习、深度学…...
Hadoop 配置 Kerberos 认证
1、安装 Kerberos 服务器和客户端 1.1 规划 服务端: bigdata3 客户端(Hadoop集群): bigdata0 bigdata1 bigdata2 192.168.50.7 bigdata0.example.com bigdata0 192.168.50.8 bigdata1.example.com bigdata1 192.168.50.9 b…...
在 Elasticsearch 中实现自动完成功能 2:n-gram
在第一部分中,我们讨论了使用前缀查询,这是一种自动完成的查询时间方法。 在这篇文章中,我们将讨论 n-gram - 一种索引时间方法,它在基本标记化后生成额外的分词,以便我们稍后在查询时能够获得更快的前缀匹配。 但在此…...
美客多、亚马逊卖家如何运用自养账号进行有效测评?
到了10月,卖家朋友们都在忙着准备Q4旺季吧! 首先,祝愿所有看到这条推文的卖家朋友,今年旺季都能爆单,赚得盆满钵满! 测评是珑哥常谈,一直备受关注,不论是新老卖家都是一个逃不开的…...
MyBatis的缓存,一级缓存,二级缓存
10、MyBatis的缓存 10.1、MyBatis的一级缓存 一级缓存是SqlSession级别的,通过同一个SqlSession对象 查询的结果数据会被缓存,下次执行相同的查询语句,就 会从缓存中(缓存在内存里)直接获取,不会重新访问…...
GitLab(1)——GitLab安装
目录 一、使用设备 二、使用rpm包安装 Gitlab国内清华源下载地址: ①下载命令如下: ②安装命令如下: ③删除rpm包 ④配置 ⑤重载 ⑥重启 ⑦配置自启动 ⑧打开8989端口并重启防火墙 三、GitLab登录 ①访问GitLab的URL ②输入用户…...
退税政策线上VR互动科普展厅为税收工作带来了强大活力
缴税纳税是每个公民应尽的义务和责任,由于很多人缺乏专业的缴税纳税操作专业知识和经验,因此为了提高大家的缴税纳税办事效率和好感度,越来越多地区税务局开始引进VR虚拟现实、web3d开发和多媒体等技术手段,基于线上为广大公民提供…...
centos 7.9离线安装wget
1.下载安装包 登录到wget官网上下载最新的wget的rpm安装包到本地 http://mirrors.163.com/centos/7/os/x86_64/Packages/ 2.上传安装包到服务器 3.安装 rpm -ivh wget-1.14-18.el7_6.1.x86_64.rpm 4.查看版本 wget -V...
【Java学习之道】网络编程的基本概念
引言 这一章我们将一同进入网络编程的世界。在开始学习网络编程之前,我们需要先了解一些基本概念。那么,我们就从“什么是网络编程”这个问题开始吧。 一、网络编程的基本概念 1.1 什么是网络编程 网络编程,顾名思义,就是利用…...
Restful API 设计示例
Restful API 设计示例 一 ,HTTP状态码 ✔️正例: 200: 返回成功 说明:200表示成功,4xx表示客户端异常,5xx表示服务端异常,参见HTTP 的返回码含义 ❌反例: 除了200就是500 说明࿱…...
为知笔记一个日记模板
<!DOCTYPE HTML><html><head> <meta http-equiv"Content-Type" content"text/html; charsetunicode"> <title>日记:</title><style id"wiz_custom_css">html, .wiz-editor-body {font-siz…...
软件测试中如何测试算法?
广义的算法是指解决问题的方案,小到求解数学题,大到制定商业策略,都可以叫做算法。而我们 今天讨论的软件测试中的算法,对应的英文单词为Algorithm ,专指计算机处理复杂问题的程序或 指令。 随着最近几年人工智能等领域的快速发展,算法受到前所未有的重视,算法测试也随之兴起。…...
CMOS图像传感器——Sony Ta-Kuchi图像传感器
2023 年国际图像传感器研讨会于 5 月在苏格兰克里夫举行,第四场会议重点关注汽车传感器,汽车应用中 CMOS 图像传感器 (CIS) 的技术要求与消费(移动)设备中的要求不同。毕竟,很少有人关心车载摄像头的像素数或图像美观度。主要驱动因素是安全性、可靠性和成本。 而汽车领域…...
一文理解登录鉴权(Cookie、Session、Jwt、CAS、SSO)
1 前言 登录鉴权是任何一个网站都无法绕开的部分,当系统要正式上线前都会要求接入统一登陆系统,一方面能够让网站只允许合法的用户访问,另一方面,当用户在网站上进行操作时也需要识别操作的用户,用作后期的操作审计。…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...
如何在看板中体现优先级变化
在看板中有效体现优先级变化的关键措施包括:采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中,设置任务排序规则尤其重要,因为它让看板视觉上直观地体…...
STM32+rt-thread判断是否联网
一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...
QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
git: early EOF
macOS报错: Initialized empty Git repository in /usr/local/Homebrew/Library/Taps/homebrew/homebrew-core/.git/ remote: Enumerating objects: 2691797, done. remote: Counting objects: 100% (1760/1760), done. remote: Compressing objects: 100% (636/636…...
uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)
UniApp 集成腾讯云 IM 富媒体消息全攻略(地理位置/文件) 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型,核心实现方式: 标准消息类型:直接使用 SDK 内置类型(文件、图片等)自…...
图解JavaScript原型:原型链及其分析 | JavaScript图解
忽略该图的细节(如内存地址值没有用二进制) 以下是对该图进一步的理解和总结 1. JS 对象概念的辨析 对象是什么:保存在堆中一块区域,同时在栈中有一块区域保存其在堆中的地址(也就是我们通常说的该变量指向谁&…...
