当前位置: 首页 > news >正文

RabbitMQ之顺序消费

什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。

队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。

不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。

消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,

虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。

如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现


private Queue creatQueue(String name){// 创建一个 单活模式 队列HashMap<String, Object> args=new HashMap<>();args.put("x-single-active-consumer",true);return new Queue(name,true,false,false,args);}

创建之后,我们可以在控制台看到 消费者的激活状态
在这里插入图片描述

=======================>配置类
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {@Beanpublic Queue queue15_0() {return creatQueue(Message15.QUEUE_0);}@Beanpublic Queue queue15_1() {return creatQueue(Message15.QUEUE_1);}@Beanpublic Queue queue15_2() {return creatQueue(Message15.QUEUE_2);}@Beanpublic Queue queue15_3() {return creatQueue(Message15.QUEUE_3);}@Beanpublic DirectExchange exchange15() {// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它return new DirectExchange(Message15.EXCHANGE, true, false);}@Beanpublic Binding binding15_0() {return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");}@Beanpublic Binding binding15_1() {return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");}@Beanpublic Binding binding15_2() {return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");}@Beanpublic Binding binding15_3() {return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");}/*** 创建一个 单活 模式的队列* 注意 :* <p>* 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除** @param name* @return queue*/private Queue creatQueue(String name) {// 创建一个 单活模式 队列HashMap<String, Object> args = new HashMap<>();args.put("x-single-active-consumer", true);return new Queue(name, true, false, false, args);}
=================================》生产者
@Component
@Slf4j
public class Producer15 {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 这里的发送是 拟投递到多个队列中** @param id  业务id* @param msg 业务信息*/public void syncSend(int id, String msg) {Message15 message = new Message15(id, msg);rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);}/*** 根据 id 取余来决定丢到那个队列中去** @param id id* @return routingKey*/private String getRoutingKey(int id) {return String.valueOf(id % Message15.QUEUE_COUNT);}
}
============================》消费者
/*** 要想保证消息的顺序,每个队列只能有一个消费者** @author 深漂码农@明哥* @date 2024-03-18*/
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {@RabbitHandlerpublic void onMessage(Message15 message) throws InterruptedException {log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);// 这里随机睡一会,模拟业务处理时候的耗时long l = new Random(1000).nextLong();TimeUnit.MILLISECONDS.sleep(l);}
}
==============================》测试类
@Testvoid mock() throws InterruptedException {// 先启动这个测试类,模拟多个副本情况下,看如何消费new CountDownLatch(1).await();}@Testvoid syncSend() throws InterruptedException {// 模拟每个队列中扔 10 个数据,看看效果for (int i = 0; i < 10; i++) {for (int j = 0; j < 4; j++) {producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息");}}TimeUnit.SECONDS.sleep(20);}
}

ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

相关文章:

RabbitMQ之顺序消费

什么是顺序消费 例如&#xff1a;业务上产生者发送三条消息&#xff0c; 分别是对同一条数据的增加、修改、删除操作&#xff0c; 如果没有保证顺序消费&#xff0c;执行顺序可能变成删除、修改、增加&#xff0c;这就乱了。 如何保证顺序性 一般我们讨论如何保证消息的顺序性&…...

轻松上手的LangChain学习说明书

一、Langchain是什么&#xff1f; 如今各类AI模型层出不穷&#xff0c;百花齐放&#xff0c;大佬们开发的速度永远遥遥领先于学习者的学习速度。。为了解放生产力&#xff0c;不让应用层开发人员受限于各语言模型的生产部署中…LangChain横空出世界。 Langchain可以说是现阶段…...

【论文笔记】Training language models to follow instructions with human feedback A部分

Training language models to follow instructions with human feedback A 部分 回顾一下第一代 GPT-1 &#xff1a; 设计思路是 “海量无标记文本进行无监督预训练少量有标签文本有监督微调” 范式&#xff1b;模型架构是基于 Transformer 的叠加解码器&#xff08;掩码自注意…...

嵌入式交叉编译:x265

下载 multicoreware / x265_git / Downloads — Bitbucket 解压编译 BUILD_DIR${HOME}/build_libs CROSS_NAMEaarch64-mix210-linuxcd build/aarch64-linuxmake cleancmake \-G "Unix Makefiles" \-DCMAKE_C_COMPILER${CROSS_NAME}-gcc \-DCMAKE_CXX_COMPILER${CR…...

一、Redis五种常用数据类型

Redis优势&#xff1a; 1、性能高—基于内存实现数据的存储 2、丰富的数据类型 5种常用&#xff0c;3种高级 3、原子—redis的所有单个操作都是原子性&#xff0c;即要么成功&#xff0c;要么失败。其多个操作也支持采用事务的方式实现原子性。 Redis特点&#xff1a; 1、支持…...

C语言动态内存管理malloc、calloc、realloc、free函数、内存泄漏、动态内存开辟的位置等的介绍

文章目录 前言一、为什么存在动态内存管理二、动态内存函数的介绍1. malloc函数2. 内存泄漏3. 动态内存开辟位置4. free函数5. calloc 函数6. realloc 函数7. realloc 传空指针 总结 前言 C语言动态内存管理malloc、calloc、realloc、free函数、内存泄漏、动态内存开辟的位置等…...

最近惊爆谷歌裁员

Python团队还没解散完&#xff0c;谷歌又对Flutter、Dart动手了。 什么原因呢&#xff0c;猜测啊。 谷歌裁员Python的具体原因可能是因为公司在进行技术栈的调整和优化。Python作为一种脚本语言&#xff0c;在某些情况下可能无法提供足够的性能或者扩展性&#xff0c;尤其是在…...

音频可视化:原生音频API为前端带来的全新可能!

音频API是一组提供给网页开发者的接口&#xff0c;允许他们直接在浏览器中处理音频内容。这些API使得在不依赖任何外部插件的情况下操作和控制音频成为可能。 Web Audio API 可以进行音频的播放、处理、合成以及分析等操作。借助于这些工具&#xff0c;开发者可以实现自定义的音…...

【中等】保研/考研408机试-动态规划1(01背包、完全背包、多重背包)

背包问题基本上都是模板题&#xff0c;重点&#xff1a;弄熟多重背包模板 dp[j]max(dp[j-v[i]]w[i],dp[j]) //核心思路代码&#xff08;一维数组版&#xff09; dp[i][j]max(dp[i-1][j], dp[i-1][j-v[i]]w[i])//二维数字版 一、 0-1背包 一般输入两个变量&#xff1a;体积&…...

[DEMO]给两个字符串取交集的词语

要求&#xff1a;2个英文字符串中&#xff0c;取相同的大于等于4个字母的词组 比如&#xff1a; 字符串1&#xff1a;" xingMeiLingabcdef WorldHello", 字符串2&#xff1a;"mnjqlup WorldLingLing xingMeiLingHello" 获取交接&#xff1a; [xingMeiLing…...

leetcode53-Maximum Subarray

题目 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组 是数组中的一个连续部分。 示例 1&#xff1a; 输入&#xff1a;nums [-2,1,-3,4,-1,2,1,-5,4] 输出&#xf…...

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之七 简单进行人脸检测并添加面具特效实现

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之七 简单进行人脸检测并添加面具特效实现 目录...

【go项目01_学习记录06】

学习记录 1 使用中间件1.1 测试一下1.2 push代码 2 URI 中的斜杆2.1 StrictSlash2.2 兼容 POST 请求 1 使用中间件 代码中存在重复率很高的代码 w.Header().Set("Content-Type", "text/html; charsetutf-8")统一对响应做处理的&#xff0c;我们可以使用中…...

Vue中Element的下载

打开vscode让项目在终端中打开 输入npm install element-ui2.15.3 然后进行下载 在node_modules中出现element-ui表示下载完成 然后在输入Vue.use(ElementUI); import Vue from vue import App from ./App.vue import router from ./router import ElementUI from element-ui…...

机器人项目相关

机器人项目相关 1. Nvidia 1.1 Jetson 1.1.1 初步安装Riva教程 llamaspeakJetson AGX Orin踩坑记录&#xff08;1&#xff09;安装Riva 参考知乎链接&#xff1a;https://zhuanlan.zhihu.com/p/670007305 1.1.2 NVIDIA Jetson AI Lab 借助 NVIDIA Jetson™ 将生成式 AI…...

Mac升级go版本某种错误情况处理

当看到 "go1.21 is keg-only, which means it was not symlinked into /opt/homebrew" 这样的信息时&#xff0c;意味着Homebrew没有自动为你创建指向新版本Go的符号链接&#xff08;symlink&#xff09;&#xff0c;因为这是一个旧版本Go的替代版本。 Homebrew中的…...

美团KV存储squirrel和Celler学习

文章目录 美团在KV存储squirrel优化和改进在水平方向1、对Gossip协议进行优化 在垂直扩展方面1、forkless RDB数据复制优化2、使用多线程&#xff0c;充分利用机器的多核能力 在高可用方面 美团持久化kv存储celler优化和改进水平扩展优化1、使用bulkload进行数据导入2、线程模型…...

Python学习笔记------处理数据和生成折线图

给定数据&#xff1a; jsonp_1629344292311_69436({"status":0,"msg":"success","data":[{"name":"美国","trend":{"updateDate":["2.22","2.23","2.24",&qu…...

知识图谱与大语言模型的协同(RAG)——MindMap

MindMap : Knowledge Graph Prompting Sparks Graph of Thoughts in Large Language Models 论文地址: https://arxiv.org/abs/2308.09729 代码:https://github.com/wylwilling/MindMap 1.概述 大型语言模型(LLMs)在处理新信息、防止生成幻觉内容、以及增强决策过程透明度…...

奶爸预备 |《P.E.T.父母效能训练:让亲子沟通如此高效而简单:21世纪版》 / 托马斯·戈登——读书笔记

目录 引出致中国读者译序前言第1章 父母总是被指责&#xff0c;而非受训练第2章 父母是人&#xff0c;不是神第3章 如何听&#xff0c;孩子才会说&#xff1a;接纳性语言第4章 让积极倾听发挥作用第5章 如何倾听不会说话的婴幼儿第6章 如何听&#xff0c;孩子才肯听第8章 通过改…...

【WebGIS实例】(13)MapboxGL 加载地形高程数据

前言 官网示例&#xff1a;Add 3D terrain to a map | Mapbox GL JS | Mapbox 大佬博客&#xff1a;Mapbox GL基础&#xff08;七&#xff09;&#xff1a;地形数据的处理与加载 (jl1mall.com) 加载Mapbox地形数据 map.once(style.load, () > {map.addSource(mapbox-dem,…...

Node.js -- MongoDB

文章目录 1. 相关介绍2. 核心概念3. 命令行交互3.1数据库命令3.2 集合命令3.3 文档命令 4. 数据库应用场景4.1 新增4.2 删除4.3 更新4.4 查询 5. 图形化工具Robo 3T 1. 相关介绍 一、简介 Mongodb是什么 MongoDB是一个基于分布式文件存储的数据库&#xff0c;官方地址https://…...

语音识别--单声道转换与降采样

⚠申明&#xff1a; 未经许可&#xff0c;禁止以任何形式转载&#xff0c;若要引用&#xff0c;请标注链接地址。 全文共计3077字&#xff0c;阅读大概需要3分钟 &#x1f308;更多学习内容&#xff0c; 欢迎&#x1f44f;关注&#x1f440;【文末】我的个人微信公众号&#xf…...

基于springboot+vue+Mysql的点餐平台网站

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…...

数据库优化

一、主从读写分离 主库:主要负责数据的写入。 从库:主要负责数据的查询。 引出问题: 可能会存在主从延迟,导致主从一致性问题。查询主库的量级需要控制。数据量庞大,索引也占据存储空间,磁盘空间不足,当主库宕机后会影响所有模块的写入,需要进行数据分片,因此引出分库…...

专业渗透测试 Phpsploit-Framework(PSF)框架软件小白入门教程(一)

本系列课程&#xff0c;将重点讲解Phpsploit-Framework框架软件的基础使用&#xff01; 本文章仅提供学习&#xff0c;切勿将其用于不法手段&#xff01; Phpsploit-Framework&#xff08;简称 PSF&#xff09;框架软件&#xff0c;是一款什么样的软件呢&#xff1f; Phpspl…...

Web安全研究(七)

NDSS 2023 开源地址&#xff1a;https://github.com/bfpmeasurementgithub/browser-fingeprint-measurement 霍普金斯大学 文章结构 introbackground threat model measurement methodology step1: traffic analysisstep2: fingerprint analysis dataset attack statisticsbro…...

矩池云jupyter运行opengait代码 未完成版

文章目录 前言——矩池云的使用技巧1.切换源 一、下载数据集二、下载模型三、环境配置1.查看python、torch、torchvision版本2.查看一些包版本是否过高3.下载包 四、开始训练1.设置环境变量2.遇到的问题&#xff08;1&#xff09;torch.cuda.is_available()返回false&#xff0…...

油烟净化器买家必看!商用油烟净化器功效及使用方法盘点

我最近分析了餐饮市场的油烟净化器等产品报告&#xff0c;解决了餐饮业厨房油腻的难题&#xff0c;更加方便了在餐饮业和商业场所有需求的小伙伴们。 在选择商用油烟净化器时&#xff0c;了解其功效和正确的使用方法至关重要。让我们一起来盘点一下。 高效净化油烟 商用油烟…...

gitee关联picgo设置自己的typora_图床

一&#xff1a;去gitee官网创建仓库&#xff1a;typora_图床 1.百度搜索关键字&#xff1a;gitee&#xff0c;进入官网 2.进入gitee登录或者注册自己的账号 3.进入主页后&#xff0c;点击右上方 4.点击新建仓库 5.设置仓库名&#xff1a;typora_图床 6.点击5的创建&#xff0…...

做图片网站咋样/全国最新实时大数据

1. Echarts环境配置选择需要的&#xff0c;然后等待Build完成之后&#xff0c;就会自动弹出下载框啦&#xff01;把下载好的js放在web目录下~ 然后在对应的jsp内导入噢~2.Echarts代码pageEncoding"UTF-8"%>Insert title here// 基于准备好的dom&#xff0c;初始化…...

爱佳倍 北京网站/google浏览器官网

1. ETCD是什么ETCD是用于共享配置和服务发现的分布式&#xff0c;一致性的KV存储系统。该项目目前最新稳定版本为2.3.0. 具体信息请参考[项目首页]和[Github]。ETCD是CoreOS公司发起的一个开源项目&#xff0c;授权协议为Apache。提供配置共享和服务发现的系统比较多&#xff0…...

网站建站网站299266co/百度推广后台

只要有标准的DES加密和解密算法&#xff0c;类似ANSI-X99MAC算法和PBOC3DES算法就很好实现。他们都是用DES算法再经过一层算法实现的。实现原理看图就能看明白。3DES算法实现就更简单了。就是DES算法再加解密一次。/*********************************************************…...

北龙中网 可信网站验证 费用/什么是核心关键词

可重入 可重入是指同一个线程如果首次获得了这把锁&#xff0c;那么因为它是这把锁的拥有者&#xff0c;因此有权利再次获取这把锁 如果是不可重入锁&#xff0c;那么第二次获得锁时&#xff0c;自己也会被锁挡住 synchronized可重入举例 static final Object obj n…...

淘宝客的网站是怎么做的/百度双十一活动

QT 加载文件&#xff0c;图片路径很容易搞混&#xff0c;需要注意的是WINDOW路径分隔符为“”,QT为“/”&#xff0c;我遇到的路径加载总结为三种情况&#xff1a; &#xff08;1&#xff09;绝对路径&#xff0c;文件的整个路径&#xff0c;比如 setWindowIcon(QIcon("F:…...

青岛网站制作价格/网络广告有哪些形式

锁相环是一个能对输入信号进行自动跟踪的负反馈控制电路。锁相环在通信、无线电电子学、自动控制和电力系统自动化等领域得到了极为广泛的应用&#xff0c;其性能的好坏将直接影响整个电子系统的工作性能[1]。随着数字技术的不断发展&#xff0c;全数字锁相环的应用范围也更加广…...