当前位置: 首页 > news >正文

RabbitMQ深入 —— 持久化和发布确认

前言

        前面的文章荔枝梳理了如何去配置RabbitMQ环境并且也介绍了两种比较简单的运行模式,在这篇文章中荔枝将会继续梳理有关RabbitMQ的持久化机制以及发布确认模式的相关知识,希望能够帮助到大家~~~


文章目录

前言

一、持久化

1.1 队列持久化

1.2 消息持久化

1.3 不公平分发

1.4 预取值

二、发布确认机制

2.1 单个发布确认

2.2 批量发布确认

2.3 异步发布确认

2.4 处理异步未确认消息的机制 

总结


一、持久化

跟MySQL和Redis一样,持久化的操作就是为了保存数据,避免因为RabbitMQ宕机停止服务之后可以确保消息不丢失。默认情况下RabbitMQ不会开启持久化,他会忽视队列和消息。 

1.1 队列持久化

        队列的持久化比较简单,只需要在生产者创建信道的时候将queueDeclare()函数的durable参数设置为ture即可。如果之前就已经创建过该队列,修改完durable之后可能会报错,这时候仅需要管理后台中删除队列即可。 

boolean durable = ture;
//信道申明
channel.queueDeclare(TASK_QUEUE_ACK, durable ,false,false,null);

1.2 消息持久化

消息持久化的实现也需要在消息生产者中修改demo,只需要在basicPublish()中设置第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN。

channel.basicPublish("",TASK_QUEUE_ACK, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

        需要注意的是:只有消息持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言已经足够,如果需要更强地保证持久化,可能要用到发布确认模式。

1.3 不公平分发

默认情况下RabbitMQ对队列中的消息采用的是轮询分发的机制,但这种机制会造成资源浪费,因为处理消息的快的消费者更多的时间是处于空闲状态。不公平分发的机制开启比较简单,只需要在消费者处的代码设置信道时,设置其basicQos的值为1即可。

channel.basicQos(1);

1.4 预取值

        由于消息的确认和发送都是异步的,因此消费者处会有一个未确认消息的缓冲区,为了使得该缓冲区不会无限制地增大,可以通过basic.qos来设置消费者未确认消息的缓冲区中允许的未确认的消息的最大数量。一旦达到最大数量就会RabbitMQ就会停止在通道上传递更多消息直至至少一个未被确认的消息被确认处理。这时候为了尽量避免自动分发和手动分发模式下无限制确认消息缓冲区所造成了消费者的RAM消耗,我们需要根据需求场景设置一个比较好的预取值。

//预取值是10
int prefetchCount = 10;
channel.basicQos(prefetchCount);

 需要注意的是预取值的设置是在消费者一方!


二、发布确认机制

在前面我们设置了队列持久化和消息持久化,但是单纯开启持久化并不能保证完全持久化,消息有可能还没来得及保存在磁盘中就发生了RabbitMQ宕机,因此我们还需要引入发布确认模式。

在消息发布者处开启发布确认模式

//开启发布确认模式
channel.confirmSelect();

下面我们来看看几种确认发布的模式:

2.1 单个发布确认

        单个发布确认是一种简单的确认方式,它是一种同步确认发布的方式。也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirms()这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

缺点:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。

    //单个确认发布public static void publishMessageIndividually() throws Exception{Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//循环发布消息for(int i=0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功"+i);}}long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("单次发布模式下发送"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}

2.2 批量发布确认

        批量消息确认发布是指不再是像单个发布确认的模式那样发一条消息等待确认后再发一条,而是一次性发送一批消息之后再统一确认发布。批量发布确认确实可以极大的提高消息队列的吞吐量,但缺点却是不清楚哪个消息出现问题。

    //批量确认发布public static void publishMessageBatch() throws Exception {Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量操作的信息大小int batchSize = 10;for (int i=0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//判断是否达到sizeif (i%batchSize==0){channel.waitForConfirms();}}long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("批量确认发布模式下发送消息"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}

2.3 异步发布确认

        相比于前面两种发布确认模式,异步确认发布在实现起来更加复杂,但是性能和效率更高。它是通过回调函数来达到消息可靠性传递和保证消息投递成功的。在异步发布确认模式下一般至少有三个线程,一个线程是用来发送消息的;另外两个线程分别是处理消息确认成功和消息确认失败的回调函数。

    //异步确认发布public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//消息确认成功的回调函数ConfirmCallback ackCallback = (deliveryTag,multiple)->{System.out.println("确认的消息:"+deliveryTag);};//消息确认失败的回调函数(消息的标记,是否为批量处理)ConfirmCallback nackCallback = (deliveryTag,multiple)->{System.out.println("未确认的消息:"+deliveryTag);};//设置消息监听器channel.addConfirmListener(ackCallback,nackCallback); //这个监听是异步的//消息发送for(int i = 0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());}//结束时间long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("异步确认发布模式下发送消息"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}

2.4 处理异步未确认消息的机制 

        处理异步未确认消息的最好解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentSkipListMap这个队列在confirm callbacks与发布线程之间进行消息的传递。 这里为了存储发送的消息体我们使用的是ConcurrentSkipListMap这个类型对象,该类对象是线程安全的有序的哈希表,适用于高并发的场景。

public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();/*** ConcurrentSkipListMap线程安全有序的一个哈希表,适用于高并发的场景下* 1.轻松的关联序号和消息* 2.轻松批量删除条目* 3.支持高并发场景*/ConcurrentSkipListMap<Long,String> outstandingConfirm = new ConcurrentSkipListMap<>();//消息确认成功的回调函数ConfirmCallback ackCallback = (deliveryTag,multiple)->{//删除掉已经确认的消息 剩下未确认的消息//判断是不是批量删除if(multiple){ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirm.headMap(deliveryTag);confirmed.clear();}else {outstandingConfirm.remove(deliveryTag);}System.out.println("确认的消息:"+deliveryTag);};//消息确认失败的回调函数(消息的标记,是否为批量处理)ConfirmCallback nackCallback = (deliveryTag,multiple)->{//打印未确认的消息String message = outstandingConfirm.get(deliveryTag);System.out.println("未确认消息是"+message+"未确认的消息的标签:"+deliveryTag);};//设置消息监听器channel.addConfirmListener(ackCallback,nackCallback); //这个监听是异步的//消息发送for(int i = 0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//记录下所有的发送消息outstandingConfirm.put(channel.getNextPublishSeqNo(), message);}//结束时间long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("异步确认发布模式下发送消息"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}

        outstandingConfirm.headMap(deliveryTag)是为了在批量操作中做一些过滤,返回小于deliveryTag的映射并调用clear方法清空outstandingConfirm 映射中所有小于 deliveryTag 的键值对,而不影响其他键值对。这里其实就是清除所有已经确认的消息,留下未确认的消息。 


总结

        这篇文章的知识都比较简单,其实中间件的设计都是为了解决问题的,而我们也可以通过保证消息不丢失的同时兼顾性能这个需求来学习这部分知识。其余的就是基本的操作和类库操作的熟练了哈哈哈哈,继续梳理的同时,荔枝也要继续努力学习咯~~~

今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~

如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!

如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!

相关文章:

RabbitMQ深入 —— 持久化和发布确认

前言 前面的文章荔枝梳理了如何去配置RabbitMQ环境并且也介绍了两种比较简单的运行模式&#xff0c;在这篇文章中荔枝将会继续梳理有关RabbitMQ的持久化机制以及发布确认模式的相关知识&#xff0c;希望能够帮助到大家~~~ 文章目录 前言 一、持久化 1.1 队列持久化 1.2 消息…...

人脸识别三部曲

人脸识别三部曲 首先看目录结构图像信息采集 采集图片.py模型训练 训练模型.py人脸识别 人脸识别.py效果 首先看目录结构 引用文121本 opencv │ 采集图片.py │ 训练模型.py │ 人脸识别.py │ └───trainer │ │ trainer.yml │ └───data │ └──…...

【Linux网络编程】Socket-TCP实例

netstat -nltp 无法用read函数读取UDP套接字的数据&#xff0c;因为UDP是面向数据报&#xff0c;而TCP是面向数据流。 客户端不需要 bind&#xff0c;listen&#xff0c;accept&#xff0c;但是客户端需要connect&#xff0c;connect会自动做bind工作。 #include <sys/sock…...

<OpenCV> 边缘填充

OpenCV边缘填充 1、边缘填充类型 enum cv::BorderTypes ORDER_CONSTANT iiiiii|abcdefgh|iiiiiii with some specified i -常量法&#xff0c;常熟值填充&#xff1b; BORDER_REPLICATE aaaaaa|abcdefgh|hhhhhhh -复制法&#xff0c;复制边缘像素&#xff1b; BORDER_R…...

【视觉SLAM入门】7.3.后端优化 基于KF/EKF和基于BA图优化的后端,推导及举例分析

"时间倾诉我的故事" 1. 理论推导2. 主流解法3. 用EKF估计状态3.1. 基于EKF代表解法的感悟 4. 用BA法估计状态4.1 构建最小二乘问题4.2 求解BA推导4.3 H的稀疏结构4.4 根据H稀疏性求解4.5 鲁棒核函数4.6 编程注意 5.总结 引入&#xff1a; 前端里程计能给出一个短时间…...

Docker概念通讲

目录 什么是Docker&#xff1f; Docker的应用场景有哪些&#xff1f; Docker的优点有哪些&#xff1f; Docker与虚拟机的区别是什么&#xff1f; Docker的三大核心是什么&#xff1f; 如何快速安装Docker&#xff1f; 如何修改Docker的存储位置&#xff1f; Docker镜像常…...

PHP请求API接口案例采集电商平台数据获取淘宝/天猫优惠券查询示例

优惠券查询API接口对于用户和商家来说具有重要作用&#xff0c;可以方便地获取优惠券信息&#xff0c;进行优惠券搜索和筛选&#xff0c;参与活动和促销推广&#xff0c;提供数据分析和决策支持&#xff0c;提升用户体验和忠诚度&#xff0c;为商家增加销售额和市场竞争力。 t…...

计算机网络:三次握手与四次挥手

摘取作者&#xff1a;拓跋阿秀 三次握手 三次握手&#xff08;Three-way Handshake&#xff09;其实就是指建立一个TCP连接时&#xff0c;需要客户端和服务器总共发送3个包。进行三次握手的主要作用就是为了确认双方的接收能力和发送能力是否正常、指定自己的初始化序列号为后…...

Visual Studio 调试上传文件时自动停止运行的解决方法

进入&#xff1a;选项&#xff0c;项目和解决方案&#xff0c;Web项目&#xff0c; 找到在浏览器窗口关闭时停止调试程序&#xff0c;在调试停止时关闭浏览器 将它不要勾关闭&#xff0c;然后重新启动下Visual Studio&#xff0c;上传文件时就可以调试了...

使用scp命令失败出错

使用scp命令失败出错&#xff0c;无反应。 解决&#xff1a; 1.使用ifconfig查看目标主机公网IP地址 ifconfig需使用公网ip 2.配置免密登录 可参考 远程登录ssh ssh-copy-id root目标主机ip再次尝试scp命令。 SCP&#xff08;Secure Copy&#xff09;是一个用于在本地主机和…...

kafka增加磁盘或者分区,topic重分区

场景&#xff1a;kafka配置文件log.dirs增加了几个目录&#xff0c;但是新目录没有分区数据写入&#xff0c;所以打算进行重分区一下。 1.生成迁移计划 进入kafka/bin目录 新建 topic-reassign.json,把要重分区的topic按下面格式写。 { "topics": [{ …...

SpringMVC系列(五)之JSR303和拦截器

目录 一. JSR303 1.1 JSR303是什么 1.2 为什么要使用JSR303 1.3 JSR303常用注解 1.4 JSR303快速入门 1. 导入相关pom依赖 2. 配置校验规则 3. 入门示例 二. SpringMVC的拦截器 2.1 什么是拦截器 2.2 拦截器与过滤器的区别 2.3 拦截器工作原理 2.4 入门示例 1. 创建…...

LCP 01.猜数字

​​题目来源&#xff1a; leetcode题目&#xff0c;网址&#xff1a;LCP 01. 猜数字 - 力扣&#xff08;LeetCode&#xff09; 解题思路&#xff1a; 遍历比较即可。 解题代码&#xff1a; class Solution {public int game(int[] guess, int[] answer) {int res0;for(int …...

智能小车开发

1.材料 店铺&#xff1a;店内搜索页-risym旗舰店-天猫Tmall.com 1.四个小车轮子 2.四个直流减速电机 3.两节18650锂电池&#xff08;每节3.7V&#xff09;&#xff0c;大概电压在7.4V左右&#xff0c;电压最好不要超过12V不然会损坏电机驱动 4.一个18650锂电池盒 5.一个L…...

RDMA性能测试工具集preftest_README

文章目录 1 概述2 安装3 测试方法说明4 测试说明5 运行测试所有测试的通用选项延迟测试选项带宽测试选项ib_send_lat&#xff08;发送延迟测试&#xff09;和 ib_send_bw&#xff08;发送带宽测试&#xff09;的选项ib_atomic_lat&#xff08;原子延迟测试&#xff09;和 ib_at…...

墨天轮专访星环科技刘熙:“向量热”背后的冷思考,Hippo如何打造“先发”优势?

导读&#xff1a; 深耕技术研发数十载&#xff0c;坚持自主可控发展路。星环科技一路砥砺前行、坚持创新为先&#xff0c;建设了全面的产品矩阵&#xff0c;并于2022年作为首个独立基础软件产品公司成功上市。星环科技在今年的向星力•未来技术大会上发布了分布式向量数据库Tra…...

逆向-beginners之非递归

/* * 非递归 */ void f() { } void main() { f(); } #if 0 /* * intel */ 0000000000001129 <f>: 1129: f3 0f 1e fa endbr64 112d: 55 push %rbp 112e: 48 89 e5 mov %rsp,%…...

Spring for Apache Kafka概述和简单入门

一、概述 Spring for Apache Kafka 的高级概述以及底层概念和可运行的示例代码。 二、准备工作 注意:进行工作开始之前至少要有一个 Apache Kafka 环境 2.1、依赖 使用 Spring Boot<dependency><groupId>org.springframework.kafka</groupId><artifact…...

基于SSM+Vue的医院医患管理系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用Vue技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…...

再次理解Android账号管理体系

目录 ✅ 0. 需求 &#x1f4c2; 1. 前言 &#x1f531; 2. 使用 2.1 账户体系前提 2.2 创建账户服务 2.3 操作账户-增删改查 &#x1f4a0; 3. 源码流程 ✅ 0. 需求 试想&#xff0c;自己去实现一个账号管理体系&#xff0c;该如何做呢&#xff1f; ——————————…...

<6>-MySQL表的增删查改

目录 一&#xff0c;create&#xff08;创建表&#xff09; 二&#xff0c;retrieve&#xff08;查询表&#xff09; 1&#xff0c;select列 2&#xff0c;where条件 三&#xff0c;update&#xff08;更新表&#xff09; 四&#xff0c;delete&#xff08;删除表&#xf…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

(二)原型模式

原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句&#xff0c;它能够让用户直接在浏览器内练习SQL的语法&#xff0c;不需要安装任何软件。 链接如下&#xff1a; sqliteviz 注意&#xff1a; 在转写SQL语法时&#xff0c;关键字之间有一个特定的顺序&#xff0c;这个顺序会影响到…...

【2025年】解决Burpsuite抓不到https包的问题

环境&#xff1a;windows11 burpsuite:2025.5 在抓取https网站时&#xff0c;burpsuite抓取不到https数据包&#xff0c;只显示&#xff1a; 解决该问题只需如下三个步骤&#xff1a; 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

uniapp微信小程序视频实时流+pc端预览方案

方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度​WebSocket图片帧​定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐​RTMP推流​TRTC/即构SDK推流❌ 付费方案 &#xff08;部分有免费额度&#x…...

【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)

升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点&#xff0c;但无自动故障转移能力&#xff0c;Master宕机后需人工切换&#xff0c;期间消息可能无法读取。Slave仅存储数据&#xff0c;无法主动升级为Master响应请求&#xff…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”

2025年#高考 将在近日拉开帷幕&#xff0c;#AI 监考一度冲上热搜。当AI深度融入高考&#xff0c;#时间同步 不再是辅助功能&#xff0c;而是决定AI监考系统成败的“生命线”。 AI亮相2025高考&#xff0c;40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕&#xff0c;江西、…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...