当前位置: 首页 > news >正文

【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式

这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

目录

一、消息队列

1.1、发布确认模式

1.2、案例代码

(1)引入依赖

(2)编写生产者【消息确认--单条确认】

(3)编写生产者【消息确认--批量确认】

(4)编写生产者【消息确认--异步确认】


一、消息队列

1.1、发布确认模式

RabbitMQ消息队列中,生产者发送消息给RabbitMQ的时候,可能会出现发送失败的情况,如果不进行处理,此时这一条消息就将丢失。如何确保生产者一定能够将消息发送到RabbitMQ里面呢???

RabbitMQ提出了一种发布确认模式,这种模式大致思想是:生产者发送消息给RabbitMQ时候,如果RabbitMQ正确接收到消息后,需要发给一个ACK标识给生产者,生产者接收到ACK标记后,就可以确认这一条消息发送成功啦。如果生产者没有接收到ACK标识,则可以重复发送这一条消息给RabbitMQ,这就可以确保消息不丢失。

发布确认模式有三种实现,分别是:逐条确认机制、批量确认机制、异步确认机制。

1.2、案例代码

(1)引入依赖

<!-- 引入 RabbitMQ 依赖 -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>

(2)编写生产者【消息确认--单条确认】

  • 生产者发送消息的时候,需要调用【confirmSelect()】方法开启消息确认机制。
  • 生产者将消息发送完成之后,需要调用【waitForConfirms()】方法,阻塞等待RabbitMQ消息队列返回ACK标识。这个方法返回一个boolean类型,true表示RabbitMQ接收消息成功,false表示接收失败。
  • 【waitForConfirms()】方法还可以指定一个超时时间,如果在这个超时时间里面RabbitMQ还没有返回ACK标识,那么该方法将抛出一个InterruptedException中断异常。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class Producer {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_direct_2023";channel.exchangeDeclare(exchangeName, "direct");// 6、发送消息for (int i = 0; i < 10; i++) {// 路由键唯一标识String routingKey = "error";if (i % 3 == 0) {routingKey = "info";} else if (i % 3 == 1) {routingKey = "warn";}String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 等待RabbitMQ返回ACK标识boolean wait = channel.waitForConfirms();System.out.println("RabbitMQ是否接收成功: " + wait);if (!wait) {// 消息发送失败,则可以重新发送channel.basicPublish(exchangeName, routingKey, null, message.getBytes());}}} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}

(3)编写生产者【消息确认--批量确认】

  • 前一种方式,是一条消息就调用一次【waitForConfirms()】方法,阻塞等待RabbitMQ的ACK确认标识。
  • 但是这种方式是非常耗时的,当需要发送的消息非常多的时候,会严重影响系统性能,所以为了解决这个问题,提出了批量确认的方法。
  • 批量确认调用【waitForConfirmsOrDie()】方法,此时会等待一批消息的ACK确认标识,如果这一批消息中存在一个消息没有被RabbitMQ成功接收,此时该方法将抛出一个【IOException】异常。
  • 所以,可以通过捕获IOException异常来判断消息是否发送成功。
  • 这种方式的缺点:当一批消息出现失败的情况时候,我们没办法知道是哪一条消息失败了,只能够重新将这一批消息重新发送。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class ProducerBatch {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_direct_2023";channel.exchangeDeclare(exchangeName, "direct");// 6、发送消息int batchSize = 3;int count = 0;for (int i = 0; i < 10; i++) {// 路由键唯一标识String routingKey = "error";if (i % 3 == 0) {routingKey = "info";} else if (i % 3 == 1) {routingKey = "warn";}String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";channel.basicPublish(exchangeName, routingKey, null, message.getBytes());// 批量确认if (count == batchSize) {// 等待RabbitMQ返回ACK标识channel.waitForConfirmsOrDie();count = 0;}count++;}} catch (IOException e) {System.out.println("消息发送失败啦");} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}

(4)编写生产者【消息确认--异步确认】

  • 异步确认在消息发送之后,调用【addConfirmListener()】方法,该方法介绍两个参数,第一个参数是成功接收到ACK标识的回调方法,第二个参数是失败接收到NACK标识的回调方法。
  • 注意:一定要先调用【addConfirmListener()】监听方法,然后再发送消息,如果两者顺序反了,则监听方法不生效。
package com.rabbitmq.demo.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @version 1.0.0* @Date: 2023/2/25 16:23* @Copyright (C) ZhuYouBin* @Description: 消息生产者*/
public class ProducerAsync {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2、设置连接的 RabbitMQ 服务地址factory.setHost("127.0.0.1"); // 默认就是本机factory.setPort(5672); // 默认就是 5672 端口// 3、获取连接Connection connection = null; // 连接Channel channel = null; // 通道try {connection = factory.newConnection();// 4、获取通道channel = connection.createChannel();// TODO 开启消息确认机制channel.confirmSelect();// 5、声明 Exchange,如果不存在,则会创建String exchangeName = "exchange_confirm_2023";channel.exchangeDeclare(exchangeName, "direct");// TODO 一定要先调用监听接口,在发送消息channel.addConfirmListener(new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {System.out.println("RabbitMQ接收成功啦.....消息的标识deliveryTag=" + deliveryTag + ",批量发送多条消息multiple=" + multiple);}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {System.out.println("RabbitMQ接收失败啦.....");}});for (int i = 0; i < 10; i++) {// 6、发送消息String message = "这是发布确认模式,发送的消息数据";channel.basicPublish(exchangeName, "queue_confirm_2023", null, message.getBytes());}} catch (IOException e) {System.out.println("消息发送失败啦");} catch (Exception e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (Exception e) {}}if (null != connection) {try {connection.close();} catch (Exception e) {}}}}
}

到此,RabbitMQ消息队列中的发布确认模式就介绍完啦。

综上,这篇文章结束了,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

相关文章:

【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式

这篇文章&#xff0c;主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。 目录 一、消息队列 1.1、发布确认模式 1.2、案例代码 &#xff08;1&#xff09;引入依赖 &#xff08;2&#xff09;编写生产者【消息确认--单条确认】 &#xff08;3&#xf…...

【华为OD机试模拟题】用 C++ 实现 - IPv4 地址转换成整数(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 去重求和(2023.Q1) 文章目录 最近更新的博客使用说明IPv4 地址转换成整数题目输入输出示例一输入输出说明示例一输入输出说明Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,...

闭包与高阶函数

文中内容均来自于曾探《JavaScript设计模式与开发实践》的学习笔记。闭包作用域变量的作用域&#xff0c;就是指变量的有效范围。局部变量、全局变量。变量的搜索是从内到外而非从外到内的。变量的生命周期对于全局变量莱索&#xff0c;全局变量的生命周期是永久的&#xff0c;…...

人工智能轨道交通行业周刊-第35期(2023.2.20-2.26)

本期关键词&#xff1a;重庆智慧轨道、智能运维主机、标准轨距、地方铁路公报、景深、机器视觉应用 1 整理涉及公众号名单 1.1 行业类 RT轨道交通人民铁道世界轨道交通资讯网铁路信号技术交流北京铁路轨道交通网上榜铁路视点ITS World轨道交通联盟VSTR铁路与城市轨道交通Rai…...

快慢指针判断链表是否有环

快慢指针判断链表是否有环 单链表有可能存在环&#xff0c;有些情况下要判断一个单链表是否有环。数组的有个快慢指针的方法&#xff0c;其实单链表和数组有相似的地方&#xff0c;可以使用快慢指针的方法。具体做法如下&#xff1a; 首先创建两个指针&#xff0c;它们初始时…...

《MongoDB入门教程》第26篇 聚合统计之$max/$min表达式

本文将会介绍两个 MongoDB 表达式&#xff0c;返回一组数据中最大值的 $max 表达式&#xff0c;以及返回一组数据中最小值的 $min 表达式。 $max 表达式 $max 表达式用于返回一组数据中的最大值&#xff0c;语法如下&#xff1a; { $max: <expression> }$max 表达式在…...

FPGA纯verilog解码SDI视频 纯逻辑资源实现 提供2套工程源码和技术支持

目录1、前言2、硬件电路解析SDI摄像头Gv8601a单端转差GTX解串SDI解码VGA时序恢复YUV转RGB图像输出FDMA图像缓存HDMI输出3、工程1详解&#xff1a;无缓存输出4、工程2详解&#xff1a;缓存3帧输出5、上板调试验证并演示6、福利&#xff1a;工程代码的获取1、前言 FPGA实现SDI视…...

JVM篇之垃圾回收

一.如何判断对象可以回收 1.引用计数法 只要一个对象被其他变量所引用&#xff0c;就让它的计数加1&#xff0c;被引用了两次就让它的计数变成2&#xff0c;当这个变量的计数变成0时&#xff0c;就可以被垃圾回收&#xff1b; 弊端&#xff1a;当出现如下图的情况&#xff0…...

尝试用程序计算Π(3.141592653......)

文章目录1. π\piπ2. 用微积分来计算π\piπ2.1 原理2.2 代码2.3 结果2.4 分析1. π\piπ π\piπ的重要性或者地位不用多说&#xff0c;有时候还是很好奇&#xff0c;精确地π\piπ值是怎么计算出来的。研究π\piπ的精确计算应该是很多数学家计算机科学家努力的方向&#xf…...

【异常检测三件套】系列3--时序异常检测综述

写在前面: 异常检测共包含3个内容,从多个方面剖析异常检测方法,本文为第三篇。过往内容请查看以下链接: 【异常检测三件套】系列1--14种异常检测算法https://blog.csdn.net/allein_STR/article/details/128114175?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%…...

关于SAP 错误日志解析

有时候启动或操作sap会出现故障&#xff0c;只是察看sap用户当前目录下的日志文件可能不得要领&#xff0c;此时有必要察看work目录下的一些trace. 以Linux系统为例&#xff0c;其他的也差不多。 instance说明 如下 DVEBMGS?? ABAP Central Instance D?? …...

java:自定义变量加载到系统变量后替换shell模版并执行shell

这里的需求前提是&#xff0c;在项目中进行某些操作前&#xff0c;需要在命令后对shell配置文件的进行修改&#xff08;如ip、port&#xff09;&#xff0c;这个对于用户是不友好的&#xff0c;需要改为用户页面输入ip、port&#xff0c;后台自动去操作修改配置&#xff1b;那么…...

Redis高级删除策略与数据淘汰

第二章&#xff1a;Redis高级 学习目标 目标1&#xff1a;能够说出redis中的数据删除策与略淘汰策略 目标2&#xff1a;能够说出主从复制的概念&#xff0c;工作流程以及场景问题及解决方案 目标3&#xff1a;能够说出哨兵的作用以及工作原理&#xff0c;以及如何启用哨兵 …...

社畜大学生的Python之pandas学习笔记,保姆入门级教学

接上期&#xff0c;上篇介绍了 NumPy&#xff0c;本篇介绍 pandas。 目录 pandas 入门pandas 的数据结构介绍基本功能汇总和计算描述统计处理缺失数据层次化索引 pandas 入门 Pandas 是基于 Numpy 构建的&#xff0c;让以 NumPy 为中心的应用变的更加简单。 Pandas是基于Numpy…...

20_FreeRTOS低功耗模式

目录 低功耗模式简介 STM32低功耗模式 Tickless模式详解 Tickless模式相关配置 实验源码 低功耗模式简介 很多应用场合对于功耗的要求很严格,比如可穿戴低功耗产品、物联网低功耗产品等。 一般MCU都有相应的低功耗模式,裸机开发时可以使用MCU的低功耗模式。 FreeRTOS也…...

Hive的使用方式

操作Hive可以在Shell命令行下操作&#xff0c;或者是使用JDBC代码的方式操作 针对命令行这种方式&#xff0c;其实还有两种使用 第一个是使用bin目录下的hive命令&#xff0c;这个是从hive一开始就支持的使用方式 后来又出现一个beeline命令&#xff0c;它是通过HiveServer2服…...

Flume三大核心组件

Flume的三大核心组件&#xff1a; Source&#xff1a;数据源 Channel&#xff1a;临时存储数据的管道 Sink&#xff1a;目的地 Source&#xff1a;数据源&#xff1a;通过source组件可以指定让Flume读取哪里的数据&#xff0c;然后将数据传递给后面的 channel Flume内置支持读…...

数据结构(六)二叉树

一、树形结构概念树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。它具有以下的特点&#xff1a;1、有一个…...

Docker buildx 的跨平台编译

docker buildx 默认的 docker build 命令无法完成跨平台构建任务&#xff0c;我们需要为 docker 命令行安装 buildx 插件扩展其功能。buildx 能够使用由 Moby BuildKit 提供的构建镜像额外特性&#xff0c;它能够创建多个 builder 实例&#xff0c;在多个节点并行地执行构建任…...

【java基础】方法重载和方法重写

文章目录方法重载方法重写方法重载 方法重载就是可以在一个类里面定义多个相同名称的方法&#xff0c;只需要参数列表的个数或者类型不同就行。 public class Overload {public int add(int a, int b) {return a b;}public double add(double a, double b) {return a b;}}对…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

如何在看板中有效管理突发紧急任务

在看板中有效管理突发紧急任务需要&#xff1a;设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP&#xff08;Work-in-Progress&#xff09;弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中&#xff0c;设立专门的紧急任务通道尤为重要&#xff0c;这能…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

Map相关知识

数据结构 二叉树 二叉树&#xff0c;顾名思义&#xff0c;每个节点最多有两个“叉”&#xff0c;也就是两个子节点&#xff0c;分别是左子 节点和右子节点。不过&#xff0c;二叉树并不要求每个节点都有两个子节点&#xff0c;有的节点只 有左子节点&#xff0c;有的节点只有…...

稳定币的深度剖析与展望

一、引言 在当今数字化浪潮席卷全球的时代&#xff0c;加密货币作为一种新兴的金融现象&#xff0c;正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而&#xff0c;加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下&#xff0c;稳定…...

AI病理诊断七剑下天山,医疗未来触手可及

一、病理诊断困局&#xff1a;刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断"&#xff0c;医生需通过显微镜观察组织切片&#xff0c;在细胞迷宫中捕捉癌变信号。某省病理质控报告显示&#xff0c;基层医院误诊率达12%-15%&#xff0c;专家会诊…...

免费PDF转图片工具

免费PDF转图片工具 一款简单易用的PDF转图片工具&#xff0c;可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件&#xff0c;也不需要在线上传文件&#xff0c;保护您的隐私。 工具截图 主要特点 &#x1f680; 快速转换&#xff1a;本地转换&#xff0c;无需等待上…...

mac 安装homebrew (nvm 及git)

mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用&#xff1a; 方法一&#xff1a;使用 Homebrew 安装 Git&#xff08;推荐&#xff09; 步骤如下&#xff1a;打开终端&#xff08;Terminal.app&#xff09; 1.安装 Homebrew…...

Golang——6、指针和结构体

指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...