当前位置: 首页 > news >正文

RabbitMQ解决消息积压的方法

目录

减少发送mq的消息体内容

增加消费者数量

批量消费消息

临时队列转移

监控和预警机制

分阶段实施

最后还有一个方法就是开启队列的懒加载


这篇文章总结一下自己知道的解决消息积压得方法。

减少发送mq的消息体内容

像我们没有必要知道一个的中间状态,只需知道一个最终状态就可以了。
发送的消息体只用包含:id和状态等关键信息,不用发送一个完整的对象内容。
消费者收到消息之后,通过id调用原服务再将完整的消息对象内容查询出来即可,最后再进行消费处理。

增加消费者数量

采用动态增加消费者的数量

@Configuration
public class RabbitMQConfig {@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置并发消费者数量factory.setConcurrentConsumers(5);        // 初始消费者数量factory.setMaxConcurrentConsumers(20);    // 最大消费者数量// 动态调整消费者数量factory.setConsumerTagStrategy(queue -> "consumer-" + UUID.randomUUID());return factory;}
}
@Service
public class ConsumerManagerService {@Autowiredprivate RabbitListenerEndpointRegistry registry;public void adjustConsumerCount(String queueName, int count) {MessageListenerContainer container = registry.getListenerContainer(queueName);if (container instanceof SimpleMessageListenerContainer) {SimpleMessageListenerContainer simpleContainer = (SimpleMessageListenerContainer) container;simpleContainer.setConcurrentConsumers(count);}}
}

批量消费消息

@Service
public class BatchMessageConsumer {@RabbitListener(queues = "myQueue", containerFactory = "batchFactory")public void processBatch(List<Message> messages, Channel channel) {try {// 批量处理消息List<MessageDTO> dtos = messages.stream().map(this::convertToDTO).collect(Collectors.toList());// 批量保存到数据库batchSaveToDatabase(dtos);// 获取最后一条消息的deliveryTaglong lastDeliveryTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();// 批量确认channel.basicAck(lastDeliveryTag, true);} catch (Exception e) {// 批量拒绝handleBatchError(messages, channel);}}
}// 配置批量消费
@Configuration
public class BatchConsumerConfig {@Beanpublic SimpleRabbitListenerContainerFactory batchFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 启用批量消费factory.setBatchListener(true);// 批量大小factory.setBatchSize(100);// 批量超时时间factory.setReceiveTimeout(1000L);return factory;}
}

临时队列转移

@Service
public class MessageTransferService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void transferMessages(String sourceQueue, String tempQueue, int batchSize) {while (true) {// 从源队列批量获取消息List<Message> messages = new ArrayList<>();for (int i = 0; i < batchSize; i++) {Message message = rabbitTemplate.receive(sourceQueue);if (message == null) break;messages.add(message);}if (messages.isEmpty()) break;// 转移到临时队列messages.forEach(msg -> rabbitTemplate.send(tempQueue, msg));}}
}// 临时队列的消费者
@Component
public class TempQueueConsumer {@RabbitListener(queues = "#{tempQueue.name}")public void processMessage(Message message) {// 使用更高效的处理方式fastProcessMessage(message);}@Beanpublic Queue tempQueue() {return new Queue("temp-queue-" + UUID.randomUUID(), false, false, true);}
}

监控和预警机制

@Service
@Slf4j
public class QueueMonitorService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Scheduled(fixedRate = 60000) // 每分钟执行一次public void monitorQueueSize() {String queueName = "myQueue";// 获取队列信息Properties properties = rabbitTemplate.execute(channel -> channel.queueDeclarePassive(queueName));// 获取消息数量int messageCount = properties.getMessageCount();// 检查消息堆积if (messageCount > threshold) {// 发送告警sendAlert(queueName, messageCount);// 动态调整消费者adjustConsumers(queueName, messageCount);}}private void adjustConsumers(String queueName, int messageCount) {// 根据消息数量动态调整消费者数量int newConsumerCount = calculateConsumerCount(messageCount);consumerManagerService.adjustConsumerCount(queueName, newConsumerCount);}
}

分阶段实施

@Service
public class MessageHandlingStrategy {public void handleMessageBacklog() {// 1. 首先增加消费者数量adjustConsumerCount();// 2. 如果仍然堆积,启用批量处理if (isStillBacklogged()) {enableBatchProcessing();}// 3. 如果问题持续,使用临时队列if (isEmergency()) {transferToTemporaryQueue();}}
}

最后还有一个方法就是开启队列的懒加载

相关文章:

RabbitMQ解决消息积压的方法

目录 减少发送mq的消息体内容 增加消费者数量 批量消费消息 临时队列转移 监控和预警机制 分阶段实施 最后还有一个方法就是开启队列的懒加载 这篇文章总结一下自己知道的解决消息积压得方法。 减少发送mq的消息体内容 像我们没有必要知道一个的中间状态&#xff0c;只需…...

Android 网络层相关介绍

关注 Android 默认支持的网络管理行为,默认支持的网络服务功能。 功能术语 术语缩写全称释义DHCPv6Dynamic Host Configuration Protocol for IPv6动态主机配置协议的第六版,用于在IPv6网络中动态分配IP地址和其他网络配置参数。DNS Domain Name System域名系统。LLALink-Loc…...

2025年第三届“华数杯”国际赛B题解题思路与代码(Matlab版)

问题1&#xff1a;产业关联性分析 在 question1.m 文件中&#xff0c;我们分析了中国主要产业之间的相互关系。以下是代码的详细解读&#xff1a; % 问题1&#xff1a;分析中国主要产业之间的相互关系function question1()% 清空工作区和命令窗口clear;clc;% 设置中文显示set…...

小米路由器IPv6 功能使用指南

本文不限于多层路由使用IPv6 的情况&#xff0c;提供解决IPv6 无法获取的更硬核的方法&#xff0c;需要有ssh 工具。&#xff08;无安卓设备&#xff0c;测试环境win、mac、ios&#xff09; 首先明确一点&#xff0c;就是如果想让你的设备得到GUA 地址&#xff0c;即访问 6.i…...

k8s dashboard离线部署步骤

确定k8s版本&#xff0c;以1.23为例。 部署metrics-server服务&#xff0c;最好用v0.5.2。 用v0.6.0&#xff0c;可能会报以下错误&#xff1a; nodekubemaster:~/Desktop/metric$ kubectl top nodes Error from server (ServiceUnavailable): the server is currently unabl…...

Wireshark抓包教程(2024最新版个人笔记)

改内容是个人的学习笔记 Wireshark抓包教程&#xff08;2024最新版&#xff09;_哔哩哔哩_bilibili 该课程笔记1-16 wireshark基础 什么是抓包工具&#xff1a;用来抓取数据包的一个软件 wireshark的功能&#xff1a;用来网络故障排查&#xff1b;用来学习网络技术 wireshark下…...

稀疏矩阵:BM25;稠密矩阵:RoBERTa - wwm - ext顺序

稀疏矩阵:BM25;稠密矩阵:RoBERTa - wwm - ext顺序 先后顺序 先BM25后RoBERTa - wwm - ext: 流程说明:首先可以使用BM25进行初步的检索。由于BM25是基于词频等统计信息的检索模型,它能够快速地从大规模文档集合中筛选出可能包含相关信息的文档子集。例如,在一个包含大量新…...

C# 结构体(Struct)

C# 结构体(Struct) 引言 在C#编程语言中,结构体(Struct)是一种值类型,它允许用户自定义数据类型。结构体可以包含多个成员,如字段、属性、构造函数和方法。与类(Class)相似,但结构体在内存管理、性能和继承方面有其独特的特点。本文将详细介绍C#结构体的概念、用法…...

Homestyler 和 Tripo AI 如何利用人工智能驱动的 3D 建模改变定制室内设计

让设计梦想照进现实 在Homestyler,我们致力于为每一个梦想设计师提供灵感的源泉,而非挫折。无论是初学者打造第一套公寓,或是专业设计师展示作品集,我们的直观工具都能让您轻松以惊人的3D形式呈现空间。 挑战:实现定制设计的新纪元 我们知道,将个人物品如传家宝椅子、…...

Python的pandas库基础知识(超详细教学)

目录 一、配置环境 二、序列和数据表 2.1 初始化 2.2 获取数值 2.3 获取索引 2.4 索引取内容 2.5 索引改变取值 2.6 字典生成序列 2.7 计算取值出现次数 2.8 数据表 2.9 数据表添加新变量 2.10 获取列名 2.11 根据列名获取数据 2.12 输出固定行 2.13 输出多行…...

【数据库】一、数据库系统概述

文章目录 一、数据库系统概述1 基本概念2 现实世界的信息化过程3 数据库系统内部体系结构4 数据库系统外部体系结构5 数据管理方式 一、数据库系统概述 1 基本概念 数据&#xff1a;描述事物的符号记录 数据库&#xff08;DB&#xff09;&#xff1a;长期存储在计算机内的、…...

大数据智能选课系统

1.产品介绍 产品名称&#xff1a;大数据智能选课系统 一、产品概述 随着信息技术的快速发展&#xff0c;大数据技术在教育领域的应用越来越广泛。针对当前高校选课过程中的繁琐操作、资源分配不均等问题&#xff0c;我们研发了一款基于大数据智能分析的选课系统。本系统旨在…...

esp32开发笔记之一:esp32开发环境搭建vscode+ubuntu

最近想用esp32做一个物联网项目&#xff0c;踩坑N个终于有点心得&#xff0c;写下来避免和我一样的小白踩无谓的坑。 写在前面&#xff1a; 第一&#xff0c;大家一定要用linux系统作为编译工具&#xff0c;速度上是windows无法比的&#xff0c;不要因为不熟悉linux而选择win…...

赛灵思(Xilinx)公司Artix-7系列FPGA

苦难从不值得歌颂&#xff0c;在苦难中萃取的坚韧才值得珍视&#xff1b; 痛苦同样不必美化&#xff0c;从痛苦中开掘出希望才是壮举。 没有人是绝对意义的主角&#xff0c; 但每个人又都是自己生活剧本里的英雄。滑雪&#xff0c;是姿态优雅的“贴地飞行”&#xff0c;也有着成…...

Trie树算法

Trie树&#xff0c;也称为前缀树或字典树&#xff0c;是一种特殊的树型数据结构。它用于存储一组字符串&#xff0c;使得查找、插入和删除字符串的操作非常高效。类似这种&#xff0c; 模板&#xff1a; 这是用数组来模拟上图中的树的结构&#xff0c;逻辑上和上图结构一致。 …...

NLTK分词以及处理方法

在自然语言处理(NLP)的领域中,文本的处理是一个基础且核心的环节,特别是在大规模数据分析和文本挖掘中。无论是聊天机器人、情感分析,还是机器翻译,分词都是必不可少的步骤之一。分词的目的是将长篇的文本拆解为较小的单位(如单词或句子),这些单位是后续分析和处理的基…...

vue3树形组件+封装+应用

文章目录 概要应用场景代码注释综合评价注意事项功能拓展代码说明概要 创建一个基于Vue 3的树形结构组件,用于展示具有层级关系的数据,并提供了节点展开/折叠、点击等交互功能。以下是对其应用场景、代码注释以及综合评价和注意事项的详细说明。 应用场景 这个组件适用于需…...

kotlin项目无法访问Java类的问题

使用IntelliJ创建一个Kotlin项目&#xff0c;然后在src/main/kotlin中创建一个java接口&#xff1a;Animal.java&#xff0c;然后在Main.kt中打印这个java接口&#xff0c;如下&#xff1a; fun main() {println(Animal::class.java) }代码在编辑器中并没有报错&#xff0c;但…...

计算机网络 (30)多协议标签交换MPLS

前言 多协议标签交换&#xff08;Multi-Protocol Label Switching&#xff0c;MPLS&#xff09;是一种在开放的通信网上利用标签引导数据高速、高效传输的新技术。 一、基本概念 MPLS是一种第三代网络架构技术&#xff0c;旨在提供高速、可靠的IP骨干网络交换。它通过将IP地址映…...

qt-C++笔记之自定义继承类初始化时涉及到parents的初始化

qt-C笔记之自定义继承类初始化时涉及到parents的初始化 code review! 参考笔记 1.qt-C笔记之父类窗口、父类控件、对象树的关系 2.qt-C笔记之继承自 QWidget和继承自QObject 并通过 getWidget() 显示窗口或控件时的区别和原理 3.qt-C笔记之自定义类继承自 QObject 与 QWidget …...

人才选拔中,如何优化面试流程

在与某大型央企的深入交流中&#xff0c;随着该企业的不断壮大与业务扩张&#xff0c;对技术人才的需求急剧上升&#xff0c;尽管企业加大了招聘力度并投入了大量资源&#xff0c;但招聘成效却不尽如人意。经过项目组细致调研与访谈&#xff0c;问题的根源逐渐浮出水面&#xf…...

2501wtl,皮肤技术

下载地址 设计目标 最重要的是使用方便,已有程序创建一个COM对象,调一个方法就可把界面外观全部改成Mac风格的. 另外一个目标是要有扩展性. 所以,基本设计是定义一个统一的接口,然后用不同实现.每一个实现单独放在一个COMDLL中,调用者选择一个类标创建对象就行了. 接口的定义…...

【面试题】技术场景 6、Java 生产环境 bug 排查

生产环境 bug 排查思路 分析日志&#xff1a;首先通过分析日志查看是否存在错误信息&#xff0c;利用之前讲过的 elk 及查看日志的命令缩小查找错误范围&#xff0c;方便定位问题。远程 debug 适用环境&#xff1a;一般公司正式生产环境不允许远程 debug&#xff0c;多在测试环…...

word论文排版常见问题汇总

word论文排版常见问题汇总 常用快捷键&#xff1a; Alt F9 正常模式与域代码模式切换 Ctrl F9 插入域代码 F9 刷新域代码显示&#xff0c;要注意选定后刷新才会有效果 word中在当前列表的基础上修改列表 在使用word时&#xff0c;我们会定义一个列表&#xff0c;并将其链接…...

传奇3仿韩服单机版安装教程+GM管理面板

今天为大家带来一款怀旧网单《传奇3仿韩服》的游戏架设&#xff0c;适用于单机娱乐&#xff0c; 仅供怀旧&#xff0c;本人已经安装游戏成功&#xff0c;特此带来详细安装教程。 适用环境 单机 视频演示 传奇3仿韩服单机 亲测截图 架设步骤 关闭默认杀毒软件和其它自己下的杀…...

第26章 汇编语言--- 内核态与用户态

汇编语言是低级编程语言的一种&#xff0c;它与特定计算机的硬件架构紧密相关。内核态和用户态是操作系统中进程运行的两种不同模式&#xff0c;它们用来区分操作系统内核代码和其他应用程序代码的执行环境。下面我将简要解释这两种状态&#xff0c;并给出一个简单的示例来展示…...

Spring bean的生命周期和扩展

接AnnotationConfigApplicationContext流程看实例化的beanPostProcessor-CSDN博客&#xff0c;以具体实例看bean生命周期的一些执行阶段 bean生命周期流程 生命周期扩展处理说明实例化:createBeanInstance 构造方法&#xff0c; 如Autowired的构造方法注入依赖bean 如UserSer…...

计算机网络 (33)传输控制协议TCP概述

一、定义与基本概念 TCP是一种面向连接的、可靠的、基于字节流的传输层通信协议。它工作在OSI模型的第四层&#xff0c;即传输层&#xff0c;为用户提供可靠的、有序的和无差错的数据传输服务。TCP协议与UDP协议是传输层的两大主要协议&#xff0c;但两者在设计上有明显的不同&…...

Python3 JSON

JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;易于人阅读和编写&#xff0c;同时也易于机器解析和生成。它基于JavaScript编程语言的一个子集&#xff0c;但JSON是独立于语言的&#xff0c;很多编程语言都支持JSON格式数据的…...

Leetcode 698 Partition to K Equal Sum Subsets

题意 给一个数组&#xff0c;要求把数组里的元素分成k个子集&#xff0c;满足每个子集中数的总和是相等的。问是否能分成k个子集 题目链接 https://leetcode.com/problems/partition-to-k-equal-sum-subsets/description/ 思考 想象你有k个桶&#xff0c;然后你有n个小球&…...

网站建设的策划/深圳百度推广联系方式

原因就是在odoo.conf配置文件中没有说明 模块查找的路径 转载于:https://www.cnblogs.com/one-tom/p/11567238.html...

南京做网站企业/广东seo

在浏览地图时&#xff0c;移动鼠标经过某个对象或者POI的时候&#xff0c;能够提示该对象的名称对用户来说是很实用的&#xff0c;本文讲述在Arcgis for Js中&#xff0c;用两种不同的方式来实现该效果。 为了有个直观的概念&#xff0c;先给大家看看实现后的效果&#xff1a; …...

青岛做网站的/营销 推广

本题解不一定正确&#xff0c;欢迎大家指正 A&#xff1a;2023 【问题描述】 请求出在 12345678 至 98765432 中&#xff0c;有多少个数中完全不包含 2023 。 完全不包含 2023 是指无论将这个数的哪些数位移除都不能得到 2023 。 例如 20322175&#xff0c;33220022 都完全不包…...

有专门做dnf工作室的网站么/搜索引擎优化分析

对dictObject list进行排序 dict [{x: 1, y:3}&#xff0c; {x: 2, y:4}] * x 的值大于2优先排 * y 按照从小到大顺序排列 for d in dict:d.is_x_more_than_two True if d.x >2 else False dict.sort(key: lambda d: (not d.is_x_more_than_two, d.y)) * x 的值大于2优先排…...

武汉做网站哪个好/腾讯企业qq官网

理解 SVG 中的 Viewport 和 ViewBox - 實做縮放&#xff08;zoom&#xff09;和拖曳&#xff08;drag&#xff09;效果 本文章同步刊載於 PJCHENder 前端網頁資源站 不同於以往將 SVG 視為一張圖案&#xff08;ICON 或 LOGO&#xff09;的概念&#xff0c;在這篇文章中&#xf…...

凡客v十商城还在吗/seo诊断专家

将一个字符串转换成一个整数&#xff0c;要求不能使用字符串转换整数的库函数。 数值为0或者字符串不是一个合法的数值则返回0&#xff1b; 思路&#xff1a;遍历查找非整数字符&#xff0c;第一个字符串位置进行判断&#xff0c; 采用-‘0’算出整数*10的多少次方&#xff1b…...