当前位置: 首页 > news >正文

学会RabbitMQ的延迟队列,提高消息处理效率

系列文章目录

手把手教你,本地RabbitMQ服务搭建(windows)
消息队列选型——为什么选择RabbitMQ
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ 能保证消息可靠性吗
推或拉? RabbitMQ 消费模式该如何选择
死信是什么,如何运用RabbitMQ的死信机制?
真的好用吗?鲜有人提的 RabbitMQ-RPC模式



在这里插入图片描述
前面我们讲到了RabbitMQ的死信队列,其实除了死信队列,RabbitMQ还有一个常用的延迟队列设计。今天,我们就来说一下这个延迟队列

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 RabbitMQ ,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis kafka docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是延迟队列?

延迟队列指的是当我们将消息发送到RabbitMQ时,可以指定消息的有效期或者消息需要在未来某个时间点才能被消费。这种消息被称为“延迟消息”。因此,RabbitMQ支持通过延迟队列来实现延迟消息的发送和消费。

二、延迟队列的实现

延迟队列的实现原理其实就是将消息放入到一个普通的队列中,只不过这个队列有一个特殊的属性:消息的消费被延迟一段时间。这个延迟时间可以是任意的,也可以是固定的。当消息进入队列时,会有一个定时器在计时,当计时器到达设定的时间时,消息会被转移至消费队列等待被消费。

在RabbitMQ中,延迟队列的实现有两种方式:一种是通过x-delayed-message插件实现;另一种是通过TTL(Time To Live)和死信队列实现。

1. x-delayed-message插件

x-delayed-message插件可以让RabbitMQ支持延迟消息功能,它是一个非官方插件,需要自行下载并安装。其源码地址如下:github地址 或 gitee地址;如果你是从笔者之前的安装博客 手把手教你,本地RabbitMQ服务搭建(windows) 过来的,那么你用的可能是RabbitMQ V3.12,可以直接下载我上传的资源 3.12-插件

首先,需要在RabbitMQ服务器上安装x-delayed-message插件。把上述的插件复制进我们RabbitMQ的服务插件目录下
在这里插入图片描述
然后执行插件的启用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 即可
然后,在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, arguments);
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");

不难发现,此时其实是交换机在做延迟,
在这里插入图片描述

当然,除了交换机的设置,在发送消息时,还需要在消息头部设置x-delay属性,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.headers(new HashMap<String, Object>(){{put("x-delay", 5000);}});
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("delayed_exchange", "delayed_routing_key", properties, message.getBytes());

2. TTL + 死信队列

此种方式的原理其实我们在学习死信队列的时候应该就察觉到了,就是利用消息超时(TTL)后会转入死信交换机的机制,其模型如下:
在这里插入图片描述

首先,需要在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
arguments.put("x-dead-letter-routing-key", "dead_letter_routing_key");
arguments.put("x-message-ttl", 5000);channel.exchangeDeclare("normal_exchange", "direct", true, false, null);
channel.exchangeDeclare("dead_letter_exchange", "direct", true, false, null);
channel.queueDeclare("normal_queue", true, false, false, arguments);
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("normal_queue", "normal_exchange", "normal_routing_key");
channel.queueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");

在发送消息时,只需要将消息发送到normal_exchange交换机下,代码如下:

channel.basicPublish("normal_exchange", "normal_routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

三、手写延时队列

当然,除了RabbitMQ,实现延时队列的方式还有很多,我们甚至可以自己实现,本节,我们就尝试自己写个延时队列

1. 时间轮概念

在关于计时或定时的设计里,时间轮是一种用于处理定时任务的数据结构。它通过将时间划分为一系列的时刻,每个时刻对应一个槽,将任务存储在相应的槽中
在这里插入图片描述
时间轮通常包含多个槽和指针,其中指针指向当前时刻对应的槽,每过单位时间,指针就指向下一个槽,这样任务调度时按照指针的移动依次执行槽中的任务
在这里插入图片描述

2. JAVA演示

我们先使用JUC相关内容实现一个时间轮

import java.util.*;
import java.util.concurrent.*;class TimeWheel {private int size;private int currentIndex;private List<BlockingQueue<Task>> slots;private Executor executor;public TimeWheel(int size, Executor executor) {this.size = size;this.slots = new ArrayList<>(size);for (int i = 0; i < size; i++) {slots.add(new LinkedBlockingQueue<>());}this.executor = executor;}public void addTask(Task task) {int expireIndex = (int)(currentIndex + task.getDelay() / 1000) % size;slots.get(expireIndex).add(task);}public void start() {new Thread(() -> {while (true) {currentIndex = (currentIndex + 1) % size;BlockingQueue<Task> currentSlot = slots.get(currentIndex);List<Task> tasks = new ArrayList<>();currentSlot.drainTo(tasks);for (Task task : tasks) {executor.execute(task);}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}class Task implements Runnable {private long delay; // 延迟时间,单位毫秒private Runnable task; // 任务public Task(long delay, Runnable task) {this.delay = delay;this.task = task;}public long getDelay() {return delay;}@Overridepublic void run() {task.run();}
}

我们可以使用main方法来尝试验证这个时间轮效果:

    public static void main(String[] args) {TimeWheel timeWheel = new TimeWheel(60 * 60, Executors.newFixedThreadPool(10));// 添加任务,延迟5秒执行timeWheel.addTask(new Task(5000, () -> System.out.println("Task 1 executed!")));// 添加任务,延迟10秒执行timeWheel.addTask(new Task(10000, () -> System.out.println("Task 2 executed!")));// 启动时间轮timeWheel.start();}

在这里插入图片描述

当然,以上代码只是一个简化的实现,实际情况中需要考虑任务执行时间和时间轮的精度等问题。

四、应用场景与注意事项

1. 应用场景

  1. 红包预告
    在现在的抢红包的场景下,当用户发起红包活动后,可能不希望立即开抢,而是设定在一段时间后开启。那么我们可以将将红包信息发送到一个延迟队列中,一定时间后,系统会自动激活红包,此时用户才可以真正抢红包
    在这里插入图片描述

  2. 订单系统
    在订单系统中,有一些订单需要在未来某个时间点才能被处理。例如,有些订单需要在一定的时间之后才能发货或者确认收货。这时候,我们可以将这些订单放到延迟队列中,当时间到达时再进行处理。

  3. 优惠券系统
    在优惠券系统中,有一些优惠券需要在未来某个时间点才能使用。这时候,我们可以将这些优惠券放到延迟队列中,当时间到达时再进行激活。

2. 注意事项

  1. 延迟队列不要使用太多
    使用延迟队列可以在一定程度上减少系统的负载,但是使用过多的延迟队列会导致系统变得更加复杂,维护起来也更加困难。

  2. 延迟队列可能会导致消息丢失
    在RabbitMQ中,当一个带有TTL消息被发送到队列中时,如果队列中的消息太多,或者队列的消费者速度太慢,就会导致消息失效,如果没有使用死信机制,消息就会被丢失。为了避免这种情况发生,我们需要对队列进行监控,及时发现问题并进行处理。

  3. 设置合适的延迟时间
    在使用延迟队列时,需要根据实际需求设置合适的延迟时间。如果延迟时间太短,可能会导致消息延迟效果不明显;如果延迟时间太长,可能会导致系统累积大量的消息,导致负载过高。

总结

RabbitMQ的延迟队列是一种非常实用的特性,可以帮助我们实现定时任务、限流、削峰等功能。但是,在使用延迟队列时,需要谨慎对待,根据实际需求设置合适的延迟时间,并及时监控队列中的消息,避免出现消息丢失的情况。

相关文章:

学会RabbitMQ的延迟队列,提高消息处理效率

系列文章目录 手把手教你&#xff0c;本地RabbitMQ服务搭建&#xff08;windows&#xff09; 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用&#xff0c;怎么理解五种消息模型 RabbitMQ 能保证消息可靠性吗 推或拉&#xff1f; RabbitMQ 消费模式该如何选择 死信是什么…...

ChatGPT会取代搜索引擎吗?BingChat、GoogleBard与ChatGPT区别

目前暂时不会&#xff0c;ChatGPT为代表的聊天机器人很可能会直接集成到搜索中&#xff0c;而不是取代它。微软已经通过Bing Chat和Bing做到了这一点&#xff0c;它将“聊天”选项卡直接放入Bing搜索的菜单中。Google、百度也分别开始尝试通过其AI生成技术将Google Bard、文心一…...

多个QLabel中文字左右对其问题研究

众所周知&#xff0c;关于QLabel 中的文字对其方式&#xff0c;官方提供多种&#xff0c;具体可参考 AlignmentFlag&#xff0c;这里就不详细列举了。 实际开发中有这样一个需求&#xff1a;多个lab中&#xff0c;文字显示不同&#xff0c;长度不一&#xff0c;但想要实现视觉…...

链式二叉树统计结点个数的方法和bug

方法一&#xff1a; 分治&#xff1a;分而治之 int BTreeSize1(BTNode* root) {if (root NULL) return 0;else return BTreeSize(root->left)BTreeSize(root->right)1; } 方法二&#xff1a; 遍历计数&#xff1a;设置一个计数器&#xff0c;对二叉树正常访问&#…...

C语言-报错集锦-03-malloc(): memory corruption: 0x0000000001496d90 ***

一、报错信息 [2023-8]--[ Debug ]--Push Data To StAccessPath OK. [2023-8]--[ Debug ]--Judge Vertex(0) Is Not Accessed. [2023-8]--[ Debug ]--Judge Vertex(2) Is Accessed. [2023-8]--[ Debug ]--Judge Vertex(3) Is Not Accessed. [2023-8]--[ Debug ]--Judge Vertex…...

现代C++中的从头开始深度学习:【5/8】卷积

一、说明 在上一个故事中&#xff0c;我们介绍了机器学习的一些最相关的编码方面&#xff0c;例如 functional 规划、矢量化和线性代数规划。 现在&#xff0c;让我们通过使用 2D 卷积实现实际编码深度学习模型来开始我们的道路。让我们开始吧。 二、关于本系列 我们将学习如何…...

以太网帧格式与吞吐量计算

以太网帧结构 帧大小的定义 以太网单个最大帧 6&#xff08;目的MAC地址&#xff09; 6&#xff08;源MAC地址&#xff09; 2&#xff08;帧类型&#xff09; 1500{IP数据包[IP头&#xff08;20&#xff09;DATA&#xff08;1480&#xff09;]} 4&#xff08;CRC校验&#xff…...

vue中install方法

1&#xff1a;语法 vue提供install可供我们开发新的插件及全局注册组件等 install方法第一个参数是vue的构造器&#xff0c;第二个参数是可选的选项对象 export default {install(Vue,option){组件指令混入挂载vue原型} }2&#xff1a;注册组件 一&#xff1a;注册单个组件 1…...

Flutter:文件读取—— video_player、chewie、image_picker、file_picker

前言 简单学习一下几个比较好用的文件读取库 video_player 简介 用于视频播放 官方文档 https://pub-web.flutter-io.cn/packages/video_player 安装 flutter pub add video_player加载网络视频 class _MyHomePageState extends State<MyHomePage> {// 控制器late…...

vim的使用

vim文本编辑器 vim介绍命令模式光标移动选中内容复制内容粘贴内容删除撤销/恢复字符转换 编辑模式末行模式保存/退出查找行号显示文件切换 扩展 vim介绍 vim是Linux自带的文本编辑器&#xff0c;具有命令模式、编辑模式、末行模式三种模式。 模式间的切换&#xff1a; 命令模…...

马氏杆法检查斜视

使用 检查水平向斜视时&#xff0c;使用水平向马氏杆检查;重直向斜视时&#xff0c;使用重直问马氏杆;检查旋转斜视时&#xff0c;使用双马氏杆. 检查水平向斜视 双眼屈光不正全矫 双眼同时打开&#xff0c;右眼前加水平向马氏杆&#xff0c;左眼前不加 双眼同时观察点光源&…...

Mac电脑怎么使用“磁盘工具”修复磁盘

我们可以使用“磁盘工具”的“急救”功能来查找和修复磁盘错误。 “磁盘工具”可以查找和修复与 Mac 磁盘的格式及目录结构有关的错误。使用 Mac 时&#xff0c;错误可能会导致意外行为&#xff0c;而重大错误甚至可能会导致 Mac 彻底无法启动。 继续之前&#xff0c;请确保您…...

c++画出分割图像,水平线和垂直线

1、pca 找到图像某个区域的垂直线&#xff0c;并画出来 // 1、 斑块的框 血管二值化图&#xff0c;pca 找到垂直血管壁的直线, 还是根据斑块找主轴方向吧// Step 1: 提取斑块左右范围内的血管像素点坐标&#xff0c;std::vector<cv::Point> points;for (int y 0; y <…...

Python 程序设计入门(015)—— enumerate() 函数的用法

Python 程序设计入门&#xff08;015&#xff09;—— enumerate() 函数的用法 目录 Python 程序设计入门&#xff08;015&#xff09;—— enumerate() 函数的用法一、enumerate() 函数的语法二、为可迭代对象创建索引三、将字符串、列表等转换为字典1、将字符串转换为字典2、…...

__dict__属性

__dict__ 是 Python 中的一个特殊属性&#xff0c;通常存在于大多数 Python 对象中&#xff0c;用于存储该对象的可变属性。 以下是关于 __dict__ 的一些关键点和详细信息&#xff1a; 存储属性&#xff1a;对于大多数自定义的 Python 对象&#xff0c;__dict__ 属性包含了这个…...

k8s之Pod控制器

目录 一、Pod控制器及其功用二、pod控制器的多种类型2.1 pod容器中的有状态和无状态的区别 三、Deployment 控制器四、SatefulSet 控制器4.1 StatefulSet由以下几个部分组成4.2 为什么要有headless&#xff1f;4.3 为什么要有volumeClaimTemplate&#xff1f;4.4 滚动更新4.5 扩…...

逆元(求乘法逆元的几种方法)

目录 逆元 加法逆元 乘法逆元 如何求 快速幂 扩展欧几里得 O(n)求1到n的乘法逆元 逆元 数学中&#xff0c;逆元素&#xff08;英语&#xff1a;Inverse element&#xff09;推广了加法中的加法逆元和乘法中的倒数。直观地说&#xff0c;它是一个可以取消另一给定元素运…...

没点本事,还真做不好数字化转型

数字化转型逐渐成为企业业务增长的利器 然而&#xff0c;在此过程中 企业最应该注重哪些&#xff1f; 效率&#xff1f;质量&#xff1f; 但还有一个至关重要的点不容忽视 那就是安全 有一家硬核企业通过技术与狠活 硬生生提升了应用安全性 保障了产业与数字化的安全融合…...

windows 10 远程桌面配置

1. 修改远程桌面端口&#xff08;3389&#xff09; 打开注册表&#xff08;winr&#xff09;, 输入regedit 找到配置项【计算机\HKEY_LOCAL_MACHINE\SYSTEM\ControlSet001\Control\Terminal Server\Wds\rdpwd\Tds\tcp】 &#xff0c; 可以通过搜索“Wds”快速定位。 修改端口配…...

OpenStreetMap 上基于A*搜索算法的C ++路线规划项目

引言 在现代的地理信息系统&#xff08;GIS&#xff09;中&#xff0c;路线规划是一个重要的组成部分。它涉及到从一个地点到另一个地点的最优路径的确定。在这篇文章中&#xff0c;我们将探讨如何在OpenStreetMap数据上实现一个基于A*搜索算法的C路线规划项目。 OpenStreetM…...

java实现随机生成验证码

import java.util.concurrent.ThreadLocalRandom;/* 生成验证码的工具 可动态配置验证码长度*/ public class CodeUtils {public static void main(String[] args) {//随机生成5个长度为4的验证码for (int i 0; i < 5; i) {System.out.println(CodeUtils.getCode(4));}for …...

Positive证书是什么?

Positive SSL是全球著名CA Sectigo的子品牌&#xff0c; 也是目前全球签发量最高的商业SSL证书。价格低&#xff0c;安全性高&#xff0c;在个人网站和中小型企业网站中拥有极高的占有率。 Positive SSL证书包括DV SSL&#xff0c; EV SSL&#xff0c;也是唯一支持IP地址加密的…...

vulnhub靶场-y0usef笔记

vulnhub靶场-y0usef笔记 信息收集 首先fscan找到目标机器ip http://192.168.167.70/ nmap扫描端口 Host is up (0.00029s latency). Not shown: 998 closed tcp ports (reset) PORT STATE SERVICE VERSION 22/tcp open ssh OpenSSH 6.6.1p1 Ubuntu 2ubuntu2.13 (Ub…...

华为智选首款纯电轿跑“LUXEED”能大卖吗?

监制 | 何玺 排版 | 叶媛 华为智选纯电轿跑来袭&#xff01; 8月7日&#xff0c;华为常务董事余承东在社交媒体上发文&#xff0c;宣布华为智选即将推出首款“突破想象”的纯电轿跑车。 01 华为智选首款纯电轿跑来袭 余承东的发文引起了极大关注&#xff0c;在各大媒体的报…...

ArcGIS API for JavaScript 3.44 地图Demo示例合集

ArcGIS API for JavaScript 3.44 demo合集 &#xff08;一&#xff09;创建地图&#xff08;二&#xff09;基准图库&#xff08;三&#xff09;编辑书签&#xff08;四&#xff09;主页按钮&#xff08;五&#xff09;LayerList小部件&#xff08;六&#xff09;测量小工具&am…...

RFID工业识别技术:供应链智能化的科技颠覆

RFID工业识别技术&#xff0c;作为物联网的先锋&#xff0c;正在供应链管理领域展现着前所未有的科技颠覆。从物料追踪到库存管理&#xff0c;再到物流配送&#xff0c;RFID技术以其高效的数据采集和智能的自动化处理&#xff0c;彻底改变着传统供应链的运营方式。 RFID在物料追…...

行列转换两例的思考

1、多行转成一列 (1)、建测试表及插入测试数据 create table t(i int,a varchar2(1)); insert into t(i,a) select 1,a from dual union all select 1,b from dual union all select 1,d from dual union all select 1,e from dual union all select 2,z from dual union all…...

高德地图 SDK 接口测试接入(AndroidTest 上手)

学习资料 官方文档 在 Android 平台上测试应用 | Android 开发者 | Android Developers 测试了解 【玩转Test】开篇-Android test 介绍 Android单元测试全解_android 单元测试_一代小强的博客-CSDN博客 Android单元测试-对Activity的测试_activitytestrule_许佳佳233的博客…...

省电模式稳定电压显示IC32×4 LCD显示驱动芯片

简述 VK1C21A是一个点阵式存储映射的LCD驱动器&#xff0c;可支持最大128点&#xff08;32SEGx4COM&#xff09; 的LCD屏&#xff0c;也支持2COM和3COM的LCD屏。单片机可通过3/4个通信脚配置显示参数和发 送显示数据&#xff0c;也可通过指令进入省电模式。具备高抗干扰&a…...

分布式架构的观测

分布式架构的观测 日志日志的输出收集与缓冲加工与聚合存储与查询 追踪数据收集 度量 在一个分布式应用中&#xff0c;如果出现了某个异常&#xff0c;那我们必然不可能只依靠 awk、grep 等命令来查看日志分析问题&#xff0c;往往分布式架构的一个异常都贯通多个节点&#xff…...

杭州专业网站设计制作/广告推广语

性能优化方案--之一方案计划 Oracle优化 Oracle是咱们存储软件&#xff0c;他自身的优化是决定咱们系统软件性能的根本。 相关需要优化的配置&#xff1a; PGA&#xff1a;适当大小&#xff0c;保证SQL高速缓存命中能在99%以上。 SGA&#xff1a;调整适当增加共享内存池大小&am…...

内容管理系统做网站/网络营销推广实训报告

前情提要&#xff1a; 2021 年进了基科班CSP2021&#xff0c; 思考了半小时想到了一个绝妙的A题做法&#xff0c;半小时rush完&#xff0c;半小时对拍。然后觉得 B 是个傻逼题&#xff0c;连写三种 dp 全假&#xff0c;最后只剩了不到一个小时&#xff0c;边抓头发边rush暴力&…...

团结湖网站建设/南京seo整站优化技术

静态绑定和动态绑定是C多态性的一种特性。 1、对象的静态类型和动态类型&#xff1a; 对象的静态类型&#xff1a;对象在声明是采用的类型&#xff0c;在编译期确定&#xff1b; 对象的动态类型&#xff1a;当前对象所指的类型&#xff0c;在运行期决定&#xff0c;对象的动态类…...

黑龙江企业网站建设公司/网络营销课程大概学什么内容

生产环境中微服务的发布是非常频繁的&#xff0c;对于一些互联网型的项目&#xff0c;甚至在你与他人谈话闲聊的几分钟内便有新的版本发布出来&#xff1b;一般的&#xff0c;每发布一个新的微服务&#xff0c;网关就可能需要为新发布的微服务定义对应的访问路由&#xff0c;如…...

深圳网站优化排名/怎么做电商平台

前言 迭代器貌似是 Python3 才有的(猜的)&#xff0c;在廖雪峰大神的网站中 Python2 是没有迭代器一栏的 可 for 循环的对象 常见集合数据类型(迭代对象)&#xff1a;list、tuple、dict、set、str生成器 generator 可迭代对象(Iterable) 可以直接用 for 循环的对象都叫可迭代对…...

做蛋糕比较火的网站/站长网站工具

lst [1, 2, 4] print lst.__iter__().next() # 打印出来的是 1 print lst.__iter__().next() # 打印出来的是 1# 调用__iter__()方法的时候&#xff0c;生成一个迭代器对象&#xff1b;如上&#xff0c;第二次调用&#xff0c;先生成对象&#xff0c;然后返回的是该对象的第一…...