【RabbitMQ】消息堆积、推拉模式
消息堆积
原因
消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因:
消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力。
消费者处理能力不足:消费者消费消息的速度跟不上消息生产的速度,也会导致消息在队列中积压。可能的原因有:
- 消费端业务逻辑复杂、耗时长。
- 消费端代码性能低。
- 系统资源限制,如CPU、内存、磁盘等也会限制消费者处理消息的速率。
- 异常处理不当,消费者在处理消息时出现异常,导致消息无法被正确处理和确认。
网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压。
RabbitMQ服务器配置偏低
解决方案
消息积压可能会导致系统性能下降,影响用户体验,甚至导致系统崩溃。因此,及时发现消息积压并解决对于维护系统稳定性至关重要。
遇到消息积压时,首先要分析消息积压造成的原因,根据原因来调整策略。通常从以下几个方面考虑:
提高消费者效率
- 增加消费者实例数量,比如新增机器。
- 优化业务逻辑,比如使用多线程来处理业务。
- 设置prefetchCount,当一个消费者阻塞时,消息转发到其他未阻塞的消费者。
- 消息发生异常时,设置合适的重试策略,或者转入到死信队列。
限制生产者效率:比如流量控制、限流算法等。
- 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送效率。
- 限流:使用限流工具,为消息发送效率设置一个上限。
- 设置过期时间:如果消息过期未被消费,可以配置死信队列,以避免消息丢失,同时减少对主队列的压力。
资源与配置优化:比如省级RabbitMQ服务器的硬件,调整RabbitMQ的配置参数等。
推拉模式
概述
RabbitMQ支持两种消息传递模式:推模式(push)和拉模式(pull)。
推模式:消息中间件主动将消息推送给消费者(对消息的获取更加实时,适合对数据实时性要求较高的业务,例如实时数据处理:监控系统、报表系统等)。
拉模式:消费者主动从消息中间件拉取消息(消费端可以按照自己的处理速度来消费,避免消息积压,适合需要流量控制,或者需要大量计算资源的任务,拉取模式允许消费者准备好之后再请求消息,避免资源浪费)。
RabbitMQ主要就是基于推模式工作的,例如最开始谈到的几种工作模式,都是基于推模式来进行实现。它的设计核心是让消息队列中的消费者接收到由生产者发送的消息,使用channel.basicConsume方式订阅队列,MQ就会把消息推送到订阅该队列的消费者。如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方式来进行消费消息。
SpringBoot方式
@Configuration
public class PushAndPull {@Bean("pushQueue")public Queue pushQueue() {return QueueBuilder.durable(Constants.PUSH_QUEUE).build();}@Bean("pushExchange")public Exchange pushExchange() {return ExchangeBuilder.directExchange(Constants.PUSH_EXCHANGE).durable(true).build();}@Bean("pushQueueBind")public Binding pushQueueBind(@Qualifier("pushExchange") Exchange exchange,@Qualifier("pushQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("push").noargs();}@Bean("pullQueue")public Queue pullQueue() {return QueueBuilder.durable(Constants.PULL_QUEUE).build();}@Bean("pullExchange")public Exchange pullExchange() {return ExchangeBuilder.directExchange(Constants.PULL_EXCHANGE).durable(true).build();}@Bean("pullQueueBind")public Binding pullQueueBind(@Qualifier("pullExchange") Exchange exchange,@Qualifier("pullQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("pull").noargs();}
}
@Slf4j
@RestController
@RequestMapping("/pushAndPull")
public class PushAndPullController {@Resourceprivate RabbitTemplate rabbitTemplate;// 推模式@RequestMapping("/push")public void push() {this.rabbitTemplate.convertAndSend(Constants.PUSH_EXCHANGE, "push", "hello push");System.out.println("推模式消息发送成功!");}// 拉模式@RequestMapping("/pull")public void pull() {this.rabbitTemplate.convertAndSend(Constants.PULL_EXCHANGE, "pull", "hello pull");System.out.println("拉模式消息发送成功!");}}
@Configuration
@RestController
public class PushAndPullListener {// 推模式@RabbitListener(queues = Constants.PUSH_QUEUE)public void push(String message) {System.out.println("推模式: " + message);}@Resourceprivate RabbitTemplate rabbitTemplate;// 拉模式@RequestMapping("/pullConsumer")public void pull(String message) {Message receive = this.rabbitTemplate.receive(Constants.PULL_QUEUE);System.out.println("拉模式: " + receive);}}
SDK方式
// 推模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");// TODO 发送消息channel.basicPublish(Constants.PUSH_EXCHANGE, "push", null, "推模式".getBytes());System.out.println("推模式发送消息成功!");// TODO 释放资源channel.close();connection.close();}}
// 推模式消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");// TODO 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("推模式消费者接收到消息:" + new String(body));}};channel.basicConsume(Constants.PUSH_QUEUE, true, consumer);// TODO 释放资源}}
// 拉模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");// TODO 发送消息channel.basicPublish(Constants.PULL_EXCHANGE, "pull", null, "拉模式".getBytes());System.out.println("拉模式发送消息成功!");// TODO 释放资源channel.close();connection.close();}}
// 拉模式消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("113.45.220.15"); // IPconnectionFactory.setPort(5672); // PORTconnectionFactory.setUsername("admin"); // 用户名connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机// TODO 创建连接Connection connection = connectionFactory.newConnection();// TODO 获取信道Channel channel = connection.createChannel();// TODO 声明交换机channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 声明队列channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);// TODO 绑定交换机和队列channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");// TODO 接收消息GetResponse getResponse = channel.basicGet(Constants.PULL_QUEUE, true);if (getResponse != null) {System.out.println("拉模式收到消息:" + new String(getResponse.getBody()));}// TODO 释放资源}}相关文章:
【RabbitMQ】消息堆积、推拉模式
消息堆积 原因 消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因: 消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息&…...
MySQL常用SQL语句(持续更新中)
文章目录 数据库相关表相关索引相关添加索引 编码相关系统变量相关 收录一些经常用到的sql 数据库相关 建数据库 CREATE DATABASE [IF NOT EXISTS] <数据库名> [[DEFAULT] CHARACTER SET <字符集名>] [[DEFAULT] COLLATE <校对规则名>];例如: C…...
【更新】红色文化之红色博物馆数据集(经纬度+地址)
数据简介:红色博物馆作为国家红色文化传承与爱国主义教育的重要基地,遍布全国各地,承载着丰富的革命历史与文化记忆。本数据说明旨在汇总并分析全国范围内具有代表性的红色博物馆的基本信息,包括其地址、特色及教育意义࿰…...
Python项目Flask框架整合Redis
一、在配置文件中创建Redis连接信息 二、 实现Redis配置类 import redis from config.config import REDIS_HOST, REDIS_PORT, REDIS_PASSWD, REDIS_DB, EXPIRE_TIMEclass RedisDb():def __init__(self, REDIS_HOST, REDIS_PORT, REDIS_DB, EXPIRE_TIME, REDIS_PASSWD):# 建立…...
完整网络模型训练(一)
文章目录 一、网络模型的搭建二、网络模型正确性检验三、创建网络函数 一、网络模型的搭建 以CIFAR10数据集作为训练例子 准备数据集: #因为CIFAR10是属于PRL的数据集,所以需要转化成tensor数据集 train_data torchvision.datasets.CIFAR10(root&quo…...
高效便捷,体验不一样的韩语翻译神器
嘿,大家好啊!今天想跟大家聊聊我用过的几款翻译神器,特别是它们在翻译韩语时的那些小感受。作为一个偶尔需要啃啃韩语资料或者跟韩国朋友聊天的普通人,我真心觉得这些翻译工具简直就是我的救星! 一、福昕在线翻译 网址…...
Markdown笔记管理工具Haptic
什么是 Haptic ? Haptic 是一个新的本地优先、注重隐私的开源 Markdown 笔记管理工具。它简约、轻量、高效,旨在提供您所需的一切,而不包含多余的功能。 目前官方提供了 docker 和 Mac 客户端。 Haptic 仍在积极开发中。以下是未来计划的一些…...
网络原理-传输层UDP
上集回顾: 上一篇博客中讲述了应用层如何自定义协议:确定传输信息,确定数据格式 应用层也有一些现成的协议:HTTP协议 这一篇博客中来讲述传输层协议 传输层 socket api都是传输层协议提供的(操作系统内核实现的了…...
C++中,如何使你设计的迭代器被标准算法库所支持。
iterator(读写迭代器) const_iterator(只读迭代器) reverse_iterator(反向读写迭代器) const_reverse_iterator(反向只读迭代器) 以经常介绍的_DList类为例,它的迭代…...
Java NIO 全面详解:掌握 `Path` 和 `Files` 的一切
在 Java 7 中引入的 NIO (New I/O) 为文件系统和流的操作带来了强大的能力,其中 Path 和 Files 是核心部分。Path 作为对文件路径的抽象,提供了灵活的方式处理文件系统中的路径;Files 则通过一系列静态方法,使得文件的读写、复制、…...
bluez免提协议hands-free介绍,全到无法想象,bluez hfp ag介绍
零. 前言 由于Bluez的介绍文档有限,以及对Linux 系统/驱动概念、D-Bus 通信和蓝牙协议都有要求,加上网络上其实没有一个完整的介绍Bluez系列的文档,所以不管是蓝牙初学者还是蓝牙从业人员,都有不小的难度,学习曲线也相对较陡,所以我有了这个想法,专门对Bluez做一个系统…...
关于区块链的安全和隐私
背景 区块链技术在近年来发展迅速,被认为是安全计算的突破,但其安全和隐私问题在不同应用中的部署仍处于争论焦点。 目的 对区块链的安全和隐私进行全面综述,帮助读者深入了解区块链的相关概念、属性、技术和系统。 结构 首先介绍区块链…...
特征工程——一门提高机器学习性能的艺术
当前围绕人工智能(AI)和机器学习(ML)展开的许多讨论以模型为中心,聚焦于 ML和深度学习(DL)的最新进展。这种模型优先的方法往往对用于训练这些模型的数据关注不足,甚至完全忽视。类似MLOps的领域正迅速发展,通过系统性地训练和利用ML模型&…...
Paper解读:工作场所人机协作的团队形成:促进组织变革的目标编程模型
人工智能(AI)具有降低运营成本、提高效率和改善客户体验的潜力。 因此,在组织中组建项目团队至关重要,这样他们就会在决策过程中欢迎人工智能。 当前的技术革命要求公司快速变革,并增加了对团队在促进创新采用方面的作…...
图文深入理解Oracle Network配置管理(一)
List item 本篇图文深入介绍Oracle Network配置管理。 Oracle Network概述 Oracle Net 服务 Oracle Net 监听程序 <oracle_home>/network/admin/listener.ora <oracle_home>/network/admin/sqlnet.ora建立网络连接 要建立客户机或中间层连接,Oracle…...
leetcode-链表篇3
leetcode-61 给你一个链表的头节点 head ,旋转链表,将链表每个节点向右移动 k 个位置。 示例 1: 输入:head [1,2,3,4,5], k 2 输出:[4,5,1,2,3]示例 2: 输入:head [0,1,2], k 4 输出&#x…...
RAG(Retrieval Augmented Generation)及衍生框架:CRAG、Self-RAG与HyDe的深入探讨
近年来,随着大型语言模型(LLMs)的迅猛发展,我们在寻求更精确、更可靠的语言生成能力上取得了显著进展。其中,检索增强生成(Retrieval-Augmented Generation)作为一种创新方法,极大地…...
C语言介绍
什么是C语言 C programing language 能干什么 Hello world? 如何学C语言 no reading no learning...
损失函数篇 | YOLOv10 更换损失函数之 MPDIoU | 《2023 一种用于高效准确的边界框回归的损失函数》
论文地址:https://arxiv.org/pdf/2307.07662v1.pdf 边界框回归(Bounding Box Regression,BBR)在目标检测和实例分割中得到了广泛应用,是目标定位的重要步骤。然而,对于边界框回归的大多数现有损失函数来说,当预测的边界框与真值边界框具有相同的长宽比,但宽度和高度的…...
WMware安装WMware Tools(Linux~Ubuntu)
1、这里终端里面输入sudo apt upgrade用于更新最新的包 sudo apt upgrade 2、安装 open-vm-tools-desktop 包, Ps:这里是以为我已经安装好了。 udo apt install open-vm-tools-desktop -y3、最后重启就大功告成了 reboot 4、测试是否成功:…...
eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...
XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...
剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...
ffmpeg(四):滤镜命令
FFmpeg 的滤镜命令是用于音视频处理中的强大工具,可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下: ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜: ffmpeg…...
IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...
QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
IP如何挑?2025年海外专线IP如何购买?
你花了时间和预算买了IP,结果IP质量不佳,项目效率低下不说,还可能带来莫名的网络问题,是不是太闹心了?尤其是在面对海外专线IP时,到底怎么才能买到适合自己的呢?所以,挑IP绝对是个技…...
