typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存
Manjaro安装RabbitMQ
安装
sudo pacman -S rabbitmq rabbitmqadmin
启动管理模块
sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server
管理界面
http://127.0.0.1:15672/
默认用户名和密码都是guest。
要使用 rabbitmqctl 命令添加用户并分配权限,您可以按照以下步骤进行操作:
- 添加用户:
rabbitmqctl add_user mingcai password
请将 password 替换为您想要设置的实际密码。
- 分配权限:
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"
这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限,请适当调整正则表达式 ".*",以授予适当的权限。例如,如果您只想给予读取权限,可以使用 "^amq\."。
- 可选步骤:设置用户角色:
您可以将用户分配给不同的角色,以便更好地管理权限。例如,您可以将用户添加到 administrator 角色以获取管理员权限:
rabbitmqctl set_user_tags mingcai administrator
这样,用户 mingcai 就被赋予了管理员权限。
请确保您具有适当的权限来执行这些操作,并确保替换示例中的用户名和密码为您自己的实际值。
死信队列


标题:利用RabbitMQ死信队列处理消息的三种情况
在消息队列的应用中,处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead Letter Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码。
1. 消息被拒绝
当消费者无法处理某条消息时,可以选择将其标记为“被拒绝”。这种情况下,我们可以配置RabbitMQ,将被拒绝的消息发送到一个死信队列,以后再处理。
// 引入amqplib库
import * as amqp from 'amqplib';// 连接到RabbitMQ服务器
const connection = await amqp.connect('amqp://localhost');// 创建Channel
const channel = await connection.createChannel();// 定义队列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {// 设置死信交换机deadLetterExchange: 'my_dead_letter_exchange'
});// 消费消息
channel.consume(queueName, (msg) => {// 处理消息if (msg) {// 处理失败,拒绝消息并将其重新放回队列// channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列// 处理失败,拒绝消息channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列// or 处理失败,拒绝消息并将其重新放回死信队列channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列,第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息}
});
2. 消息过期
有时候我们希望消息在一定时间内被处理,如果超过了这个时间,就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间,并在消息过期后将其发送到死信队列。
// 发布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {expiration: '60000' // 设置过期时间为60秒
});
3. 队列达到最大长度
为了避免队列过载,我们可以限制队列的最大长度。当队列达到最大长度时,新的消息将被拒绝,并发送到死信队列。
// 定义队列
await channel.assertQueue(queueName, {maxLength: 100, // 设置最大队列长度为100deadLetterExchange: 'my_dead_letter_exchange'
});
通过以上配置,我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况,保证消息系统的稳定性和可靠性。
以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助!
延时队列
什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等



npm install amqplib --save
npm install @types/amqplib --save-dev

总结

rabbitmqadmin 使用入门
rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于执行各种管理任务,如创建队列、交换机,查看队列状态等。以下是一些基本的用法示例:
export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672
export RABBITMQ_USER=mingcai
export RABBITMQ_PASSWORD=passwordrabbitmqadmin list exchanges
- 查看 RabbitMQ 服务器信息:
rabbitmqadmin status
- 列出所有交换机:
rabbitmqadmin list exchanges
- 列出所有队列:
rabbitmqadmin list queues
- 创建一个交换机:
rabbitmqadmin declare exchange name=my_exchange type=direct
- 创建一个队列:
rabbitmqadmin declare queue name=my_queue
- 绑定队列到交换机:
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
- 发送消息到指定交换机:
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
- 获取队列消息:
rabbitmqadmin get queue=my_queue
这些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息,或者查看官方文档以了解更多选项和使用方法。
延时3秒和8秒全部代码
// delayProducer.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();const exchange = 'routing_exchange';// 定义 dlx-exchangeconst dlxExchangeName = 'dlx-exchange';// 声明交换机await channel.assertExchange(exchange, 'direct', {durable: true});await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丢失const dlxqueueBindings= [{dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',},{dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'}];for (const binding of dlxqueueBindings) {// 绑定延迟死信队列await channel.assertQueue(binding.dlxQueueName );//死信交换机和死信队列绑定 Routing key fast 的消息await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机}// 定义队列和路由键的映射const queueBindings = [{queue: '3_second_queue', routingKey: 'fast', arguments: {'x-message-ttl': 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}},{queue: '8_second_queue', routingKey: 'slow', arguments: {'x-message-ttl': 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}}];// 声明队列,并将队列绑定到交换机上for (const binding of queueBindings) {await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});await channel.bindQueue(binding.queue, exchange, binding.routingKey);}for (let i = 0; i < 10; i++) {await new Promise((resolve) => {setTimeout(() => {resolve(1)}, 1000)})const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log('当前中国时间:', chinaTime);// 发送消息到交换机,并设置不同的路由键await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);}// 关闭连接setTimeout(async () => {await channel.close();await connection.close();}, 10000); // 在 10 秒后关闭连接
}async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {channel.publish(exchange, routingKey, Buffer.from(message));console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}setupRouting().catch(console.error);
//消费者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-3_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-8_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);相关文章:
typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存
Manjaro安装RabbitMQ 安装 sudo pacman -S rabbitmq rabbitmqadmin启动管理模块 sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server管理界面 http://127.0.0.1:15672/ 默认用户名和密码都是guest。 要使用 rabbitmqctl 命令添加用户并分配权限…...
怎样才能把重建大师的空三导进去CC?
导出空三文件xml两者都是通用的,cc和photoscan都可以兼容。 重建大师是一款专为超大规模实景三维数据生产而设计的集群并行处理软件,输入倾斜照片,激光点云,POS信息及像控点,输出高精度彩色网格模型,可一键…...
命令模式(请求与具体实现解耦)
目录 前言 UML plantuml 类图 实战代码 模板 Command Invoker Receiver Client 前言 命令模式解耦了命令请求者(Invoker)和命令执行者(receiver),使得 Invoker 不再直接引用 receiver,而是依赖于…...
开发一款MMOARPG难度到底有多大
开发一款MMOARPG难度到底有多大 MMORPG游戏开发到底有多难,我们按照过去开发的标准,就比如开发一款传奇,那时候哪会用什么别人的引擎,都是自研,从基础图形API开始。我们不考虑美术和策划,就单指程序&#x…...
RTSP应用:实现视频流的实时推送
在实现实时视频流推送的项目中,RTSP(Real Time Streaming Protocol)协议扮演着核心角色。本文将指导你通过安装FFmpeg软件,下载并编译live555,以及配置ffmpeg进行视频流推送,来实现一个基本的RTSP流媒体服务…...
Java八股文(数据结构)
Java八股文の数据结构 数据结构 数据结构 请解释以下数据结构的概念:链表、栈、队列和树。 链表是一种线性数据结构,由节点组成,每个节点包含了指向下一个节点的指针; 栈是一种后进先出(LIFO)的数据结构&a…...
ActiveMQ Artemis 系列| High Availability 主备模式(消息复制) 版本2.19.1
一、ActiveMQ Artemis 介绍 Apache ActiveMQ Artemis 是一个高性能的开源消息代理,它完全符合 Java Message Service (JMS) 2.0 规范,并支持多种通信协议,包括 AMQP、MQTT、STOMP 和 OpenWire 等。ActiveMQ Artemis 由 Apache Software Foun…...
QGIS插件系列--WhiteBox Tools
WhiteBox Tools(官网机翻): WhiteboxTools是由圭尔夫大学地貌测量和水文地理信息学研究小组(GHRG)开发的高级地理空间软件包和数据分析平台。该项目始于2017年<>月,并在分析能力方面迅速发展。WhiteboxTools的一…...
SpringMVC设置全局异常处理器
文章目录 背景分析使用ControllerAdvice(RestControllerAdvice)ExceptionHandler实现全局异常全局异常处理-多个处理器匹配顺序存在一个类中存在不同的类中 对于过滤器和拦截器中的异常,有两种思路可以考虑 背景 在项目中我们有需求做一个全…...
Acwing_795前缀和 【一维前缀和】+【模板】二维前缀和
Acwing_795前缀和 【一维前缀和】 题目: 代码: #include <bits/stdc.h> #define int long long #define INF 0X3f3f3f3f #define endl \n using namespace std; const int N 100010; int arr[N];int n,m; int l,r; signed main(){std::ios::s…...
docker 部署 gitlab-ce 16.9.1
文章目录 [toc]拉取 gitlab-ce 镜像创建 gitlab-ce 持久化目录启停脚本配置配置 gitlab-ce编辑 gitlab-ce 配置文件重启 gitlab-ce配置 root 密码 设置中文 gitlab/gitlab-ce(需要科学上网) 拉取 gitlab-ce 镜像 docker pull gitlab/gitlab-ce:16.9.1-ce.0查看镜像是不是有 Vo…...
29.Python从入门到精通—Python3 面向对象继承 多继承 方法重写 类属性与方法
29.从入门到精通:Python3 面向对象继承 多继承 方法重写 类属性与方法 继承多继承方法重写类属性与方法 继承 在面向对象编程中,继承是指通过继承现有类的属性和方法来创建新类的过程。新类称为子类(或派生类),现有类…...
jQuery如何获取元素宽高?
在jQuery中,获取元素的宽和高有多种方法,取决于你是否需要包括边框、内边距或其他额外空间。以下是几种常用的方式: 获取元素内容区域的宽和高(不包括边框和内边距): var width $(#yourElement).width(); …...
springdata框架对es集成
什么是spring data框架 Spring Data是一个用于简化数据库、非关系型数据库、索引库访问,并支持云服务的开源框架。其主要目标是使得对数据的访问变得方便快捷,并支持 map-reduce框架和云计算数据服务。Spring Data可以极大的简化JPA(Elasticsearch…)的…...
jvm(虚拟机)运行时数据区域介绍
Java虚拟机(JVM)运行时数据区域是Java程序在运行过程中使用的内存区域,它主要包括以下几个部分: 程序计数器(Program Counter Register): 程序计数器是一块较小的内存区域,是线程私有…...
C++ MFC 只启动一个程序实例 唤醒之前的实例(完整源码)
初级代码游戏的专栏介绍与文章目录-CSDN博客 很多时候我们希望只允许启动一个程序实例,如果再次运行,就唤醒之前的实例。 目录 1 概述 2 相关技术介绍 2.1 互斥对象 2.2 查找窗口 2.3 唤醒窗口 1 概述 技术上并不难,涉及到以下几个技术…...
2024多云管理平台CMP排名看这里!
随着云计算技术的迅猛发展,多云管理平台CMP应运而生。多云管理平台CMP仅能够简化对多个云环境的统一管理,还能提高资源利用效率和降低成本。因此了解多云管理平台CMP品牌是必要的。2024多云管理平台CMP排名看这里!仅供参考哈! 20…...
MySQL 数据库的日志管理、备份与恢复
一. 数据库备份 1.数据备份的重要性 备份的主要目的是灾难恢复。 在生产环境中,数据的安全性至关重要。 任何数据的丢失都可能产生严重的后果。 造成数据丢失的原因: 程序错误人为,操作错误,运算错误,磁盘故障灾难(如火灾、地震࿰…...
一、Go开发环境搭建
文章目录 1、开发工具2、开发环境配置3、Hello World4、语法 1、开发工具 https://code.visualstudio.com/download 2、开发环境配置 类比Java的JDK,go的SDK下载:https://studygolang.com/dl 解压: 配置环境变量path,将命令&quo…...
包子凑数(蓝桥杯,闫氏DP分析法)
题目描述: 小明几乎每天早晨都会在一家包子铺吃早餐。 他发现这家包子铺有 N 种蒸笼,其中第 i 种蒸笼恰好能放 Ai 个包子。 每种蒸笼都有非常多笼,可以认为是无限笼。 每当有顾客想买 X 个包子,卖包子的大叔就会迅速选出若干笼…...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...
业务系统对接大模型的基础方案:架构设计与关键步骤
业务系统对接大模型:架构设计与关键步骤 在当今数字化转型的浪潮中,大语言模型(LLM)已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中,不仅可以优化用户体验,还能为业务决策提供…...
[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解
突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 安全措施依赖问题 GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
学校招生小程序源码介绍
基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码,专为学校招生场景量身打造,功能实用且操作便捷。 从技术架构来看,ThinkPHP提供稳定可靠的后台服务,FastAdmin加速开发流程,UniApp则保障小程序在多端有良好的兼…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...
Golang——7、包与接口详解
包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...
掌握 HTTP 请求:理解 cURL GET 语法
cURL 是一个强大的命令行工具,用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中,cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...
