当前位置: 首页 > news >正文

RabbitMQ之顺序消费

什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。

队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。

不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。

消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,

虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。

如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现


private Queue creatQueue(String name){// 创建一个 单活模式 队列HashMap<String, Object> args=new HashMap<>();args.put("x-single-active-consumer",true);return new Queue(name,true,false,false,args);}

创建之后,我们可以在控制台看到 消费者的激活状态
在这里插入图片描述

=======================>配置类
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {@Beanpublic Queue queue15_0() {return creatQueue(Message15.QUEUE_0);}@Beanpublic Queue queue15_1() {return creatQueue(Message15.QUEUE_1);}@Beanpublic Queue queue15_2() {return creatQueue(Message15.QUEUE_2);}@Beanpublic Queue queue15_3() {return creatQueue(Message15.QUEUE_3);}@Beanpublic DirectExchange exchange15() {// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它return new DirectExchange(Message15.EXCHANGE, true, false);}@Beanpublic Binding binding15_0() {return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");}@Beanpublic Binding binding15_1() {return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");}@Beanpublic Binding binding15_2() {return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");}@Beanpublic Binding binding15_3() {return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");}/*** 创建一个 单活 模式的队列* 注意 :* <p>* 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除** @param name* @return queue*/private Queue creatQueue(String name) {// 创建一个 单活模式 队列HashMap<String, Object> args = new HashMap<>();args.put("x-single-active-consumer", true);return new Queue(name, true, false, false, args);}
=================================》生产者
@Component
@Slf4j
public class Producer15 {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 这里的发送是 拟投递到多个队列中** @param id  业务id* @param msg 业务信息*/public void syncSend(int id, String msg) {Message15 message = new Message15(id, msg);rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);}/*** 根据 id 取余来决定丢到那个队列中去** @param id id* @return routingKey*/private String getRoutingKey(int id) {return String.valueOf(id % Message15.QUEUE_COUNT);}
}
============================》消费者
/*** 要想保证消息的顺序,每个队列只能有一个消费者** @author 深漂码农@明哥* @date 2024-03-18*/
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {@RabbitHandlerpublic void onMessage(Message15 message) throws InterruptedException {log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);// 这里随机睡一会,模拟业务处理时候的耗时long l = new Random(1000).nextLong();TimeUnit.MILLISECONDS.sleep(l);}
}
==============================》测试类
@Testvoid mock() throws InterruptedException {// 先启动这个测试类,模拟多个副本情况下,看如何消费new CountDownLatch(1).await();}@Testvoid syncSend() throws InterruptedException {// 模拟每个队列中扔 10 个数据,看看效果for (int i = 0; i < 10; i++) {for (int j = 0; j < 4; j++) {producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息");}}TimeUnit.SECONDS.sleep(20);}
}

ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

相关文章:

RabbitMQ之顺序消费

什么是顺序消费 例如&#xff1a;业务上产生者发送三条消息&#xff0c; 分别是对同一条数据的增加、修改、删除操作&#xff0c; 如果没有保证顺序消费&#xff0c;执行顺序可能变成删除、修改、增加&#xff0c;这就乱了。 如何保证顺序性 一般我们讨论如何保证消息的顺序性&…...

轻松上手的LangChain学习说明书

一、Langchain是什么&#xff1f; 如今各类AI模型层出不穷&#xff0c;百花齐放&#xff0c;大佬们开发的速度永远遥遥领先于学习者的学习速度。。为了解放生产力&#xff0c;不让应用层开发人员受限于各语言模型的生产部署中…LangChain横空出世界。 Langchain可以说是现阶段…...

【论文笔记】Training language models to follow instructions with human feedback A部分

Training language models to follow instructions with human feedback A 部分 回顾一下第一代 GPT-1 &#xff1a; 设计思路是 “海量无标记文本进行无监督预训练少量有标签文本有监督微调” 范式&#xff1b;模型架构是基于 Transformer 的叠加解码器&#xff08;掩码自注意…...

嵌入式交叉编译:x265

下载 multicoreware / x265_git / Downloads — Bitbucket 解压编译 BUILD_DIR${HOME}/build_libs CROSS_NAMEaarch64-mix210-linuxcd build/aarch64-linuxmake cleancmake \-G "Unix Makefiles" \-DCMAKE_C_COMPILER${CROSS_NAME}-gcc \-DCMAKE_CXX_COMPILER${CR…...

一、Redis五种常用数据类型

Redis优势&#xff1a; 1、性能高—基于内存实现数据的存储 2、丰富的数据类型 5种常用&#xff0c;3种高级 3、原子—redis的所有单个操作都是原子性&#xff0c;即要么成功&#xff0c;要么失败。其多个操作也支持采用事务的方式实现原子性。 Redis特点&#xff1a; 1、支持…...

C语言动态内存管理malloc、calloc、realloc、free函数、内存泄漏、动态内存开辟的位置等的介绍

文章目录 前言一、为什么存在动态内存管理二、动态内存函数的介绍1. malloc函数2. 内存泄漏3. 动态内存开辟位置4. free函数5. calloc 函数6. realloc 函数7. realloc 传空指针 总结 前言 C语言动态内存管理malloc、calloc、realloc、free函数、内存泄漏、动态内存开辟的位置等…...

最近惊爆谷歌裁员

Python团队还没解散完&#xff0c;谷歌又对Flutter、Dart动手了。 什么原因呢&#xff0c;猜测啊。 谷歌裁员Python的具体原因可能是因为公司在进行技术栈的调整和优化。Python作为一种脚本语言&#xff0c;在某些情况下可能无法提供足够的性能或者扩展性&#xff0c;尤其是在…...

音频可视化:原生音频API为前端带来的全新可能!

音频API是一组提供给网页开发者的接口&#xff0c;允许他们直接在浏览器中处理音频内容。这些API使得在不依赖任何外部插件的情况下操作和控制音频成为可能。 Web Audio API 可以进行音频的播放、处理、合成以及分析等操作。借助于这些工具&#xff0c;开发者可以实现自定义的音…...

【中等】保研/考研408机试-动态规划1(01背包、完全背包、多重背包)

背包问题基本上都是模板题&#xff0c;重点&#xff1a;弄熟多重背包模板 dp[j]max(dp[j-v[i]]w[i],dp[j]) //核心思路代码&#xff08;一维数组版&#xff09; dp[i][j]max(dp[i-1][j], dp[i-1][j-v[i]]w[i])//二维数字版 一、 0-1背包 一般输入两个变量&#xff1a;体积&…...

[DEMO]给两个字符串取交集的词语

要求&#xff1a;2个英文字符串中&#xff0c;取相同的大于等于4个字母的词组 比如&#xff1a; 字符串1&#xff1a;" xingMeiLingabcdef WorldHello", 字符串2&#xff1a;"mnjqlup WorldLingLing xingMeiLingHello" 获取交接&#xff1a; [xingMeiLing…...

leetcode53-Maximum Subarray

题目 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组 是数组中的一个连续部分。 示例 1&#xff1a; 输入&#xff1a;nums [-2,1,-3,4,-1,2,1,-5,4] 输出&#xf…...

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之七 简单进行人脸检测并添加面具特效实现

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之七 简单进行人脸检测并添加面具特效实现 目录...

【go项目01_学习记录06】

学习记录 1 使用中间件1.1 测试一下1.2 push代码 2 URI 中的斜杆2.1 StrictSlash2.2 兼容 POST 请求 1 使用中间件 代码中存在重复率很高的代码 w.Header().Set("Content-Type", "text/html; charsetutf-8")统一对响应做处理的&#xff0c;我们可以使用中…...

Vue中Element的下载

打开vscode让项目在终端中打开 输入npm install element-ui2.15.3 然后进行下载 在node_modules中出现element-ui表示下载完成 然后在输入Vue.use(ElementUI); import Vue from vue import App from ./App.vue import router from ./router import ElementUI from element-ui…...

机器人项目相关

机器人项目相关 1. Nvidia 1.1 Jetson 1.1.1 初步安装Riva教程 llamaspeakJetson AGX Orin踩坑记录&#xff08;1&#xff09;安装Riva 参考知乎链接&#xff1a;https://zhuanlan.zhihu.com/p/670007305 1.1.2 NVIDIA Jetson AI Lab 借助 NVIDIA Jetson™ 将生成式 AI…...

Mac升级go版本某种错误情况处理

当看到 "go1.21 is keg-only, which means it was not symlinked into /opt/homebrew" 这样的信息时&#xff0c;意味着Homebrew没有自动为你创建指向新版本Go的符号链接&#xff08;symlink&#xff09;&#xff0c;因为这是一个旧版本Go的替代版本。 Homebrew中的…...

美团KV存储squirrel和Celler学习

文章目录 美团在KV存储squirrel优化和改进在水平方向1、对Gossip协议进行优化 在垂直扩展方面1、forkless RDB数据复制优化2、使用多线程&#xff0c;充分利用机器的多核能力 在高可用方面 美团持久化kv存储celler优化和改进水平扩展优化1、使用bulkload进行数据导入2、线程模型…...

Python学习笔记------处理数据和生成折线图

给定数据&#xff1a; jsonp_1629344292311_69436({"status":0,"msg":"success","data":[{"name":"美国","trend":{"updateDate":["2.22","2.23","2.24",&qu…...

知识图谱与大语言模型的协同(RAG)——MindMap

MindMap : Knowledge Graph Prompting Sparks Graph of Thoughts in Large Language Models 论文地址: https://arxiv.org/abs/2308.09729 代码:https://github.com/wylwilling/MindMap 1.概述 大型语言模型(LLMs)在处理新信息、防止生成幻觉内容、以及增强决策过程透明度…...

奶爸预备 |《P.E.T.父母效能训练:让亲子沟通如此高效而简单:21世纪版》 / 托马斯·戈登——读书笔记

目录 引出致中国读者译序前言第1章 父母总是被指责&#xff0c;而非受训练第2章 父母是人&#xff0c;不是神第3章 如何听&#xff0c;孩子才会说&#xff1a;接纳性语言第4章 让积极倾听发挥作用第5章 如何倾听不会说话的婴幼儿第6章 如何听&#xff0c;孩子才肯听第8章 通过改…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

Linux-07 ubuntu 的 chrome 启动不了

文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了&#xff0c;报错如下四、启动不了&#xff0c;解决如下 总结 问题原因 在应用中可以看到chrome&#xff0c;但是打不开(说明&#xff1a;原来的ubuntu系统出问题了&#xff0c;这个是备用的硬盘&a…...

Python ROS2【机器人中间件框架】 简介

销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...

Linux 中如何提取压缩文件 ?

Linux 是一种流行的开源操作系统&#xff0c;它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间&#xff0c;使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的&#xff0c;要在 …...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...

tomcat指定使用的jdk版本

说明 有时候需要对tomcat配置指定的jdk版本号&#xff0c;此时&#xff0c;我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...

Ubuntu系统复制(U盘-电脑硬盘)

所需环境 电脑自带硬盘&#xff1a;1块 (1T) U盘1&#xff1a;Ubuntu系统引导盘&#xff08;用于“U盘2”复制到“电脑自带硬盘”&#xff09; U盘2&#xff1a;Ubuntu系统盘&#xff08;1T&#xff0c;用于被复制&#xff09; &#xff01;&#xff01;&#xff01;建议“电脑…...

VisualXML全新升级 | 新增数据库编辑功能

VisualXML是一个功能强大的网络总线设计工具&#xff0c;专注于简化汽车电子系统中复杂的网络数据设计操作。它支持多种主流总线网络格式的数据编辑&#xff08;如DBC、LDF、ARXML、HEX等&#xff09;&#xff0c;并能够基于Excel表格的方式生成和转换多种数据库文件。由此&…...

Monorepo架构: Nx Cloud 扩展能力与缓存加速

借助 Nx Cloud 实现项目协同与加速构建 1 &#xff09; 缓存工作原理分析 在了解了本地缓存和远程缓存之后&#xff0c;我们来探究缓存是如何工作的。以计算文件的哈希串为例&#xff0c;若后续运行任务时文件哈希串未变&#xff0c;系统会直接使用对应的输出和制品文件。 2 …...