当前位置: 首页 > news >正文

RabbitMQ使用StringRedisTemplate-防止重复消费

造成重复消费的原因:

MQ向消费者推送message,消费者向MQ返回ack,告知所推送的消息消费成功。但是由于网络波动等原因,可能造成消费者向MQ返回的ack丢失。MQ长时间(一分钟)收不到ack,于是会向消费者再次推送该条message,这样就造成了重复消费。

解决重复消费的办法:

用存储(redis或者mysql)记录一下已经消费的message的id,当message被消费前先去存储中查一下消费记录,没有该条message的id则正常消费返回ack,有该条message的id的话不用消费直接返回ack给MQ。

当然实际生产中的话选用redis是比较好的选择,毕竟查mysql要进行磁盘IO,效率要低得多,而且绝大多数重复消费都是由于MQ没有收到消费者的ack于是造成MQ再次向消费者进行同一条message的投递。所以message的消费记录其实我们并不需要一直记录,只需要保存一段时间,当下次投递过来的时候消费者能查到消费记录然后准确返回ack给MQ就行。

yml

#配置rabbitMq 服务器rabbitmq:host: xxxx#rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口port: xxxxusername: xxxxpassword: xxxx#虚拟host 可以不设置,使用server默认hostvirtual-host: xxxxconnection-timeout: 0#确认消息已发送到队列(Queue)publisher-returns: true #确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 设置消费端手动 acklistener:simple:retry:# 开启消费者(程序出现异常的情况下)进行重试enabled: true#重试间隔时间max-interval: 1000# 最大重试次数max-attempts: 3#开启手动确认消息acknowledge-mode: manual

监听类
 

package com.rabbitmqprovider.service;import com.rabbitmq.client.Channel;
import com.rabbitmqprovider.commons.CommonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.io.IOException;/**** 防止重复消费*/
@Slf4j
@Service
public class TestBasicService {@Autowiredprivate StringRedisTemplate redisTemplate;/*** RabbitListener 可以写在类、方法上* @param channel* @param message* @throws IOException*///@RabbitListener(queues = {CommonUtils.queueStr})@RabbitHandlerpublic void getMessage(Channel channel, Message message) throws IOException {try{String messageId= message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"UTF-8");//判断messageId在redis中是否存在boolean flage=stringRedisTemplate(messageId,msg);if(!flage){log.error("消息已重复处理失败,拒绝再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息}else{//如果要防止 重复消费,则需要将 id值存在 redis,每次 都要去redis中拿id比对,是否存在,存在则消费过->messageIdchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("接收到的消息{}->"+redisTemplate.opsForValue().get(messageId));}}catch (Exception e){if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}/*** 判断Key是否存在* @param messageId 唯一表示key* @param msg       value值* @return*/private boolean stringRedisTemplate(String messageId,String msg){log.info("messageId="+messageId);//判断Key是否存在 有则返回true,没有则返回falseif(redisTemplate.hasKey(messageId)){return false;}else{redisTemplate.opsForValue().setIfAbsent(messageId, msg);}return true;}
}

------------------------------------------controller--------------------------------------------------

/*** 解决重复消费问题*/
@GetMapping("/sendMessageTestOnly")
public void sendMessageTestOnly(){JSONObject jsonObject = new JSONObject();jsonObject.put("message","世界很大!");jsonObject.put("msg","你想去看看么?");String json = jsonObject.toJSONString();String messageId=UUID.randomUUID()+"";Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(messageId).build();rabbitTemplate.convertAndSend(CommonUtils.dirExchange,CommonUtils.routingKey,message,new CorrelationData(UUID.randomUUID().toString()));
}

---------------------------------回调------------------------------------------------------

package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 队列防止消息丢失*/
@Slf4j
@Component
public class QueueCallback implements RabbitTemplate.ReturnCallback{@Overridepublic void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {log.info("消息 {} 经交换机 {} 通过routingKey={} 路由到队列失败,失败code为:{}, 失败原因为:{}",new String(message.getBody()), exchange, routingKey, replyCode, replyText);}
}
package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 当消息由生产者发到交换机后会回调该接口中的confirm方法*/
@Component
@Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{/* correlationData 内含消息内容* ack 交换机接受成功或者失败。 true表示交换机接受消息成功, false表示交换机接受失败* cause 表示失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){log.info("交换机收到消息 消息内容为{}->", correlationData);}else {log.info("交换机未收到消息消息内容为{}, 原因为{}->", correlationData, cause);}}}

-----------------------------------------------------------------------------------------------------------------

执行顺序时:先发送消息;然后在接收消息,并判断消息是否重复,如果不重复 则回复消息,否则 拒绝回复;最后回调。

相关文章:

RabbitMQ使用StringRedisTemplate-防止重复消费

造成重复消费的原因: MQ向消费者推送message,消费者向MQ返回ack,告知所推送的消息消费成功。但是由于网络波动等原因,可能造成消费者向MQ返回的ack丢失。MQ长时间(一分钟)收不到ack,于是会向消…...

临沂大学张继群寄语

目录 寄语 1、不能有不良睹好 2、坚毅的个性和勤奋的品质 3、会存钱...

线程学习笔记

1:Thread 线程的生命周期控制 2:Runnable 可执行的任务和程序 3:Callable 执行程序后返回结果 4:Future 收集程序返回结果 5:Executor 线程池 6:ForkJoin 默认线程池 每个线程有工作队列 工作窃取 7:RunnableFuture FutureTask 实现 Runnable 和 Future 执…...

代码随想录算法训练营第四十二天|01背包问题,你该了解这些!、01背包问题,你该了解这些! 滚动数组 、416. 分割等和子集

文章目录 01背包问题,你该了解这些!01背包问题,你该了解这些! 滚动数组416. 分割等和子集 01背包问题,你该了解这些! 题目链接:代码随想录 二维数组解决0-1背包问题 解题思路: 1.dp…...

结构体指针、数组指针和结构体数组指针

结构体指针 首先让我们定义结构体: struct stu { char name[20]; long number; float score[4]; }; 再定义指向结构体类型变量的指针变量: struct stu *student; /*定义结构体类型指针*/ student malloc(sizeof(struct stu)); /*为指针变量分…...

项目架构一些注意点

考虑系统的 稳定性 一、微服务的稳定性 1、如何解决那些不稳定的因素/问题?也是常说的如何容错。 2、一个系统的高可用取决于它本身和其强依赖的组件的高可用 3、消除单点 保活机制 健康检查 注册中心如何保障稳定性 注册中心集群 微服务本身对注册信息的本地持…...

Forefront GPT-4免费版:开启无限畅聊时代,乐享人工智能快感,无限制“白嫖”,还能和N多角色一起聊天?赶紧注册,再过些时间估计就要收费了

目录 前言注册登录方式应用体验聊天体验绘图体验 “是打算先免费后收费吗?”建议其它资料下载 前言 近期,人工智能技术迎来重大飞跃,OpenAI的ChatGPT等工具成为全球数亿人探索提高生产力和增强创造力的新方法。人们现在可以使用人工智能驱动…...

深入浅出 Compose Compiler(1) Kotlin Compiler KCP

前言 Compose 的语法简洁、代码效率非常高,这主要得益于 Compose Compiler 的一系列编译期魔法,帮开发者生成了很多样板代码。但编译期插桩也阻碍了我们对于 Compose 运行原理的认知,想要真正读懂 Compose 就必须先了解它的 Compiler。本系列…...

BatchNormalization和LayerNormalization的理解、适用范围、PyTorch代码示例

文章目录 为什么要NormalizationBatchNormLayerNormtorch代码示例 学习神经网络归一化时,文章形形色色,但没找到适合小白通俗易懂且全面的。学习过后,特此记录。 为什么要Normalization 当输入数据量级极大或极小时,为保证输出数…...

大数据 | 实验二:文档倒排索引算法实现

文章目录 📚实验目的📚实验平台📚实验内容🐇在本地编写程序和调试🥕代码框架思路🥕代码实现 🐇在集群上提交作业并执行🥕在集群上提交作业并执行,同本地执行相比即需修改…...

Java文档注释-JavaDoc标签

标签含义author指定作者{code}使用代码字体以原样显示信息,不处理HTML样式deprecated指定程序元素已经过时{docRoot}指定当前文档的根目录路径exception标识由方法或构造函数抛出的异常{inheritDoc}从直接超类中继承注释{link}插入指向另外一个主题的内联链接{linkp…...

黑盒测试过程中【测试方法】详解5-输入域,输出域,猜错法

在黑盒测试过程中,有9种常用的方法:1.等价类划分 2.边界值分析 3.判定表法 4.正交实验法 5.流程图分析 6.因果图法 7.输入域覆盖法 8.输出域覆盖法 9.猜错法 黑盒测试过程中【测试方法】讲解1-等价类,边界值,判定表_朝一…...

Python学习之sh(shell脚本)在Python中的使用

文章目录 前言一、sh是什么?二、使用步骤1.安装2.使用示例3.使用sh执行命令4.关键字参数5.查找命令6.Baking参数 前言 本文章向大家介绍[Python库]分析一个python库–sh(系统调用),主要内容包括其使用实例、应用技巧、基本知识点…...

追求卓越:编写高质量代码的方法和技巧

本文讨论了编写高质量代码的重要性,并详细介绍了高质量代码的特征、编程实践技巧和软件工程方法论。通过遵循这些原则和实践,程序员可以编写出更稳定、可维护和可扩展的代码。 一、 前言 写出高质量代码是每个程序员的追求和目标。高质量的代码可以使程…...

MATLAB算法实战应用案例精讲-【人工智能】机器视觉(概念篇)(最终篇)

目录 前言 几个高频面试题目 如何评价一个光源的好坏? 如何依靠光源增强图像对比度?...

【老王读SpringMVC-3】根据 url 是如何找到 controller method 的?

前面分析了 request 与 handler method 映射关系的注册,现在再来分析一下 SpringMVC 是如何根据 request 来获取对应的 handler method 的? 可能有人会说,既然已经将 request 与 handler method 映射关系注册保存在了 AbstractHandlerMethodMapping.Ma…...

人机交互到艺术设计及玫瑰花绘制实例

Python库之图形用户界面 Riverbank Computing | Introduction Welcome to wxPython! | wxPython Overview — PyGObject Python库之游戏开发 https://www.pygame.org/news Panda3D | Open Source Framework for 3D Rendering & Games python.cocos2d.org Python库之…...

多臂老虎机问题

1.问题简介 多臂老虎机问题可以被看作简化版的强化学习问题,算是最简单的“和环境交互中的学习”的一种形式,不存在状态信息,只有动作和奖励。多臂老虎机中的探索与利用(exploration vs. exploitation)问题一直以来都…...

DNS 查询原理详解

DNS(Domain Name System)是互联网上的一种命名系统,它将域名转换为IP地址。在进行DNS查询时,先要明确需要查询的主机名,然后向本地DNS服务器发出查询请求。 1. 本地DNS服务器查询 当用户在浏览器中输入一个URL或者点…...

浅谈软件测试工程师的技能树

软件测试工程师是一个历史很悠久的职位,可以说从有软件开发这个行业以来,就开始有了软件测试工程师的角色。随着时代的发展,软件测试工程师的角色和职责也在悄然发生着变化,从一开始单纯的在瀑布式开发流程中担任测试阶段的执行者…...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

2.Vue编写一个app

1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

Redis数据倾斜问题解决

Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中&#xff0c;部分节点存储的数据量或访问量远高于其他节点&#xff0c;导致这些节点负载过高&#xff0c;影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...