rabbitmq+springboot实现幂等性操作
文章目录
- 1.场景描述
-
- 1.1 场景1
- 1.2 场景2
- 2.原理
- 3.实战开发
-
- 3.1 建表
- 3.2 集成mybatis-plus
- 3.3 集成RabbitMq
-
- 3.3.1 安装mq
- 3.3.2 springBoot集成mq
- 3.4 具体实现
-
- 3.4.1 mq配置类
- 3.4.2 生产者
- 3.4.3 消费者
1.场景描述
消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。
1.1 场景1
什么意思呢?举个例子:一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。
这种情景就会出现消息可能被多次地投递。
1.2 场景2
还有一种场景是程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。
以上两个场景对于消息队列来说就是同一个messageId的消息重复投递下来了。
我们利用消息id来判断消息是否已经消费过,如果该信息被消费过,那么消息表中已经 会有一条数据,由于消费时会先执行插入操作,此时会因为主键冲突无法重复插入,我们就利用这个原理来进行幂等的控制,消息内容可以用json格式来进行传输的。
3.实战开发
3.1 建表
DROP TABLE IF EXISTS `message_idempotent`;
CREATE TABLE `message_idempotent` (`message_id` varchar(50) NOT NULL COMMENT '消息ID',`message_content` varchar(2000) DEFAULT NULL COMMENT '消息内容',`status` int DEFAULT '0' COMMENT '消费状态(0-未消费成功;1-消费成功)',`retry_times` int DEFAULT '0' COMMENT '重试次数',`type` int DEFAULT '0' COMMENT '消费类型',PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.2 集成mybatis-plus
《springBoot集成mybatisPlus》
3.3 集成RabbitMq
3.3.1 安装mq
推荐使用docker安装rabbitmq,还未安装的可以参考以下信息:
- docker安装
3.3.2 springBoot集成mq
- 1.添加依赖
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
3.4 生产者具体实现
3.4.1 mq配置类
- DirectRabbitConfig
具体如何开启可以参考《rabbitMq实现死信队列》
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitmqConfig {//正常交换机的名字public final static String EXCHANGE\_NAME = "exchange\_name";//正常队列的名字public final static String QUEUE\_NAME="queue\_name";//死信交换机的名字public final static String EXCHANGE\_DEAD = "exchange\_dead";//死信队列的名字public final static String QUEUE\_DEAD="queue\_dead";//死信路由keypublic final static String DEAD\_KEY="dead.key";//创建正常交换机@Bean(EXCHANGE\_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)//持久化 mq重启后数据还在.durable(true).build();}//创建正常队列@Bean(QUEUE\_NAME)public Queue queue(){//正常队列和死信进行绑定 转发到 死信队列,配置参数Map<String,Object>map=getMap();return new Queue(QUEUE\_NAME,true,false,false,map);}//正常队列绑定正常交换机 设置规则 执行绑定 定义路由规则 requestmaping映射@Beanpublic Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,@Qualifier(EXCHANGE\_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange)//路由规则.with("app.#").noargs();}//创建死信队列@Bean(QUEUE\_DEAD)public Queue queueDead(){return new Queue(QUEUE\_DEAD,true,false,false);}//创建死信交换机@Bean(EXCHANGE\_DEAD)public Exchange exchangeDead(){return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD).durable(true) //持久化 mq重启后数据还在.build();}//绑定死信队列和死信交换机@Beanpublic Binding deadBinding(){return BindingBuilder.bind(queueDead()).to(exchangeDead())//路由规则 正常路由key.with(DEAD\_KEY).noargs();}/\*\*获取死信的配置信息\*\*\*/public Map<String,Object>getMap(){//3种方式 任选其一,选择其他方式之前,先把交换机和队列删除了,在启动项目,否则报错。//方式一Map<String,Object> map=new HashMap<>(16);//死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);//死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值map.put("x-dead-letter-routing-key", DEAD\_KEY);//方式二//消息的过期时间,单位:毫秒;达到时间 放入死信队列// map.put("x-message-ttl",5000);//方式三//队列最大长度,超过该最大值,则将从队列头部开始删除消息;放入死信队列一条数据// map.put("x-max-length",3);return map;}}
- 延迟队列配置
具体如何开启可以参考《rabbitMq实现死信队列》
由于rabbitMq中不直接支持死信队列,需要我们利用插件rabbitmq_delayed_messgae_exchage进行开启
/*** 定义延迟交换机*/
@Configuration
public class RabbitMQDelayedConfig {//队列private static final String DELAYQUEUE = "delayedqueue";//交换机private static final String DELAYEXCHANGE = "delayedExchange";@Beanpublic Queue delayqueue(){return new Queue(DELAYQUEUE);}//自定义延迟交换机@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");/*** 1、交换机名称* 2、交换机类型* 3、是否需要持久化* 4、是否需要自动删除* 5、其他参数*/return new CustomExchange(DELAYEXCHANGE,"x-delayed-message",true,false,arguments);}//绑定队列和延迟交换机@Beanpublic Binding delaybinding(){return BindingBuilder.bind(delayqueue()).to(delayedExchange()).with("sectest").noargs();}
}
3.4.2 生产者
- 1.消费队列的生产者
import com.example.shop.config.RabbitmqConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
public class Sender_Direct {@Autowiredprivate AmqpTemplate rabbitTemplate;/*** 用于消费订单** @param orderId*/public void send2Direct(String orderId) {//创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)MessageProperties messageProperties = new MessageProperties();rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, "内容设置", message -> {//设置消息的id为唯一messageProperties.setMessageId(UUID.randomUUID().toString());messageProperties.setContentType("text/plain");messageProperties.setContentEncoding("utf-8");message.getMessageProperties().setMessageId(orderId);return message;});}}
3.4.3 消费者
1.开启手动ack配置
spring:application:name: shoprabbitmq:host: 192.168.1.102port: 5673virtual-host: /username: guestpassword: guestlistener:simple:# 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 autoacknowledge-mode: manual
消费者要配置ack重试机制,具体参考前几篇文章,使用的是mysql消息ID的唯一性,有时候可能生成一样的订单,具体的没有进行实验,内容是json生成的,可以执行业务
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.des.Bean.MessageIdempotent;
import com.example.des.Bean.Shop;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class Receiver_Direct {private static final Integer delayTimes = 30;//延时消费时间,单位:秒@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = {"smsQueue"})public void receiveD(Message message, Channel channel) throws IOException {try {// 获取消息IdString messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody());//获取消息//向数据库插入数据MessageIdempotent messageIdempotent = new MessageIdempotent();messageIdempotent.setMessageId(messageId);messageIdempotent.setMessageContent(msg);messageIdempotent.setRetryTimes(0);System.out.println(messageIdempotent.toString());Boolean save = true; //设置保存成功,消息投递失败是在确认模式那里if (!save) {//说明属于重重复请求//1、处理消息内容的业务,解析json数据//2、创建订单,并保存Boolean flag = consumeOrder(new Shop());if (flag){//投入延迟队列,如果30分钟订单还没有消费,就删除订单rabbitTemplate.convertAndSend("delayedExchange","sectest",message,message1->{//设置发送消息的延长时间 单位:ms,表示30分钟message1.getMessageProperties().setDelay(1000*60*30);return message1;});//更新消息状态,消费成功,channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}else {//延迟投入死信,进行重试channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}} else {//1、处理消息内容的业务,解析json数据//2、创建订单,并保存//投入死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}catch (Exception e){System.out.println("错误信息");}}private boolean consumeOrder(Shop shop) {return true;}@RabbitListener(queues = {" delay.queue.demo.delay.queue"})public void dead(String payload, Message message, Channel channel) throws IOException {System.out.println("死信队列:"+payload);//删除消息 将数据库状态更新为失败,更新邮件或者消息通知,有时候可以人工消费long deliveryTag=message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag,true);}@RabbitListener(queues = "delayedqueue")public void receivemsg(Message messages){//查询有没有被消费,也就是更新成功,有时候需要乐观锁}
}
至此mq的消息重复以及幂等的信息处理就很完美的解决了,当然本文以数据库为例进行实现,感兴趣的可以尝试使用redis来进行实现
相关文章:
rabbitmq+springboot实现幂等性操作
文章目录 1.场景描述 1.1 场景11.2 场景2 2.原理3.实战开发 3.1 建表3.2 集成mybatis-plus3.3 集成RabbitMq 3.3.1 安装mq3.3.2 springBoot集成mq 3.4 具体实现 3.4.1 mq配置类3.4.2 生产者3.4.3 消费者 1.场景描述 消息中间件是分布式系统常用的组件,无论是异…...
ubuntu server 更改时区:上海
1. 打开终端,在命令行中以超级用户或具有sudo权限的用户身份运行以下命令: sudo dpkg-reconfigure tzdata 这会打开一个对话框,用于选择系统的时区设置。 2. 在对话框中,使用上下箭头键在地区列表中选择"Asia"&#x…...
java 整合 swagger-ui 步骤
1.在xml 中添加Swagger 相关依赖 <!-- springfox-swagger2 --><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><!-- springfox-swa…...
介绍两款生成神经网络架构示意图的工具:NN-SVG和PlotNeuralNet
对于神经网络架构的可视化是很有意义的,可以在很大程度上帮助到我们清晰直观地了解到整个架构,我们在前面的 PyTorch的ONNX结合MNIST手写数字数据集的应用(.pth和.onnx的转换与onnx运行时) 有介绍,可以将模型架构文件(常见的格式都可以)在线上…...
iOS IdiotAVplayer实现视频分片缓存
文章目录 IdiotAVplayer 实现视频切片缓存一 iOS视频边下边播原理一 分片下载的实现1 分片下载的思路2 IdiotAVplayer 实现架构 三 IdiotAVplayer 代码解析IdiotPlayerIdiotResourceLoaderIdiotDownLoader IdiotAVplayer 实现视频切片缓存 一 iOS视频边下边播原理 初始化AVUR…...
SpringBootWeb请求-响应
HTTP请求 前后端分离 在这种模式下,前端技术人员基于"接口文档",开发前端程序;后端技术人员也基于"接口文档",开发后端程序。 由于前后端分离,对我们后端技术人员来讲,在开发过程中&a…...
List集合详解
目录 1、集合是什么? 1.1、集合与集合之间的关系 2、List集合的特点 3、遍历集合的三种方式 3.1、foreach(增强佛如循环遍历) 3.2、for循环遍历 3.3、迭代器遍历 4、LinkedList和ArrayList的区别 4.1、为什么ArrayList查询会快一些? 4.2、为什么LinkedLi…...
投稿指南【NO.12_8】【极易投中】核心期刊投稿(组合机床与自动化加工技术)
近期有不少同学咨询投稿期刊的问题,大部分院校的研究生都有发学术论文的要求,少部分要求高的甚至需要SCI或者多篇核心期刊论文才可以毕业,但是核心期刊要求论文质量高且审稿周期长,所以本博客梳理一些计算机特别是人工智能相关的期…...
解决git无法上传大文件(50MB)
解决方法 使用LFS解决GitHub无法上传大于50MB的文件 LFS简介 Git LFS(Large File Storage)是 Git 的一个扩展,用于管理大型文件,如二进制文件、图像、音频和视频文件等。它的主要目的是解决 Git 对大型二进制文件的版本控制和存…...
用递归实现字符串逆序(不使用库函数)
文章目录 前言一、题目要求二、解题步骤1.大概框架2.如何反向排列?3.模拟实现strlen4.实现反向排列5.递归实现反向排列 总结 前言 嗨,亲爱的读者们!我是艾老虎尤,今天,我们将探索一个题目,这个题目对新手非…...
初学python(一)
一、python的背景和前景 二、 python的一些小事项 1、在Java、C中,2 / 3 0,也就是整数 / 整数 整数,会把小数部分舍掉。而在python中2 / 3 0.66666.... 不会舍掉小数部分。 在编程语言中,浮点数遵循IEEE754标准,不…...
Excel VSTO开发8 -相关控件
版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 8 相关控件 在VSTO开发中,Ribbon(或称为Ribbon UI)是指Office应用程序中的那个位于顶部的带有选…...
华为数据管理——《华为数据之道》
数据分析与开发 元数据是描述数据的数据,用于打破业务和IT之间的语言障碍,帮助业务更好地理解数据。 元数据是数据中台的重要的基础设施,元数据治理贯彻数据产生、加工、消费的全过程,沉淀了数据资产,搭建了技术和业务…...
Flink CDC 菜鸟教程 -环境篇
本教程将介绍如何使用 Flink CDC 来实现这个需求, 在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 系统的整体架构如下图所示: 环境篇 1、 准备一台Linux 2、准备教程所需要的组件 下载 flink-1.13.2 并将其解压至目录 flink-1.13.2 …...
【线上问题】linux部署docker应用docker-compose启动报端口占用问题(感觉上没有被占用)
目录 一、问题说明二、排查过程 一、问题说明 1.linux服务器使用的不是root用户权限 2.docker应用服务没有关闭的情况下,做了些重装docker,重启docker等操作 3.docker-compose up -d然后docker logs查看日志报端口被占用 4.netstat -ntpl | grep 端口 也…...
解决虚拟机克隆后IP和命名冲突问题
目录 解决IP冲突问题 解决命名冲突 解决IP冲突问题 克隆后的虚拟机和硬件地址和ip和我们原虚拟机的相同,我们需要重新生成硬件地址和定义ip,步骤如下: (1)进入 /etc/sysconfig/network-scripts/ifcfg-ens33 配置文件…...
分享一个python基于数据可视化的智慧社区服务平台源码
💕💕作者:计算机源码社 💕💕个人简介:本人七年开发经验,擅长Java、Python、PHP、.NET、Node.js、微信小程序、爬虫、大数据等,大家有这一块的问题可以一起交流! …...
[密码学入门]凯撒密码
单表代换 单表:英文26字母的顺序 代换:替换为别的字母并保证解密的唯一性 假如我们让加密方式为所有字母顺序移动3位 import stringstring.ascii_lowercase abcdefghijklmnopqrstuvwxyz b3 加密算法y(xb)mod26 解密算法为x(y-b)mod26 密钥空间26 …...
博客之QQ登录功能(一)
流程图 上图spring social 封装了1-8步需要的工作 1、新建包和书写配置文件 public class QQProperties {//App唯一标 识private String appId "100550231";private String appSecret "69b6ab57b22f3c2fe6a6149274e3295e";//QQ供应商private String…...
Redis多机数据库实现
Redis多机数据库实现 为《Redis设计与实现》笔记 复制 客户端可以使用SLAVEOF命令将指定服务器设置为该服务器的主服务器 127.0.0.1:12345> SLAVEOF 127.0.0.1 6379127.0.0.1:6379将被设置为127.0.0.1:123456的主服务器 旧版复制功能的实现 Redis的复制功能分为同步&a…...
Leangoo领歌 -敏捷任务管理软件,任务管理更轻松更透明
任务管理,简单易懂,就是对任务进行管理。那怎么可以更好进行任务管理呢?怎么样样可以让任务进度可视化,一目了然呢?有效的管理可以让我们事半功倍。 接下来我们看一下如何借助任务管理软件高效的做任务管理。 首先…...
go的iris框架进行本地资源映射到服务端
我这里使用的是HandleDirapi,有其他的请补充 package mainimport ("github.com/kataras/iris/v12" )type Hello struct{Status int json:"status"Message string json:"message" }func main(){app : iris.New()//第一个api:相当于首页app.Get(&q…...
代码随想录day46|139. 单词拆分
139. 单词拆分 class Solution:def wordBreak(self, s: str, wordDict: List[str]) -> bool:dp [False]*(len(s)1)dp[0]Truefor i in range(len(s)1):for j in wordDict:if i>len(j) and (s[i-len(j):i] in wordDict) and dp[i-len(j)]:dp[i] Truereturn dp[len(s)]多…...
MATLAB实现函数拟合
目录 一.理论知识 1.拟合与插值的区别 2.几何意义 3.误差分析 二.操作实现 1.数据准备 2.使用cftool——拟合工具箱 三.函数拟合典例 四.代码扩展 一.理论知识 1.拟合与插值的区别 通俗的说,插值的本质是根据现有离散点的信息创建出更多的离散点…...
vue优化首屏加载时间优化-cdn引入第三方包
前言 为什么要进行首屏加载优化,因为随着我们静态资源和第三方包和代码增加,压缩之后包会越来越大 随着网络的影响,在我们第一输入url请求资源时候,网络阻塞,加载时间长,用户体验不好 仔细观察后就会发现…...
lv4 嵌入式开发-3 标准IO的读写
目录 1 标准I/O – 读写流 2 标准I/O – 按字符输入 3 标准I/O – 按字符输出 4 标准I/O – 思考和练习 5 标准I/O – 按行输入 6 标准I/O – 按行输出 7 标准I/O – 思考和练习 1 标准I/O – 读写流 流支持不同的读写方式: 读写一个字符:fgetc()/fputc()一…...
iOS UIDevice设备信息
识别设备和操作系统 //获得共享设备实例 open class var current: UIDevice { get }//识别设备的名称 open var name: String { get } // e.g. "My iPhone"//设备类型 open var model: String { get } // e.g. "iPhone", "iPod touch"//本地化设…...
SLAM ORB-SLAM2(2)编译安装
SLAM ORB-SLAM2(2)编译安装 1. 软件包依赖安装2. 依赖安装2.1. Eigen2.2. Pangolin2.3. OpenCV3. ORB-SLAM23.1. 源码下载3.2. 文件修改3.3. 扩大交换空间3.4. 编译1. 软件包依赖安装 以一个纯净的ubuntu20.04桌面版为例 1.首先设置软件源为清华源 2.安装必要依赖 sudo ap…...
第11节-PhotoShop基础课程-索套工具
文章目录 前言1.索套工具 选中后按Ctrl 可以移动2.加,减,交叉 shift alt 2.多边形索套工具 手动首尾相连 或者双击空地1.单击绘制直线选区2.双击结束绘制3.加,减,交叉4. delete可以删除节点 3.磁性索套工具1.沿着边缘自动吸附2.可…...
Json字符串内容比较-超实用版
背景 之前有类似接口diff对比,数据对比的测试需求,涉及到json格式的数据对比,调研了几个大神们分享的代码,选了一个最符合自己需求的研究了下。 说明 这个对比方法,支持JsonObject和JsonArray类型的数据对比&#x…...
别的网站做相关链接怎么做/知名网站
Introduction了解在设计Java API时应该应用的一些API设计实践。通常,这些实践很有用,并确保API可以在模块化环境中正确使用,例如OSGi和Java平台模块系统(JPMS)。有些做法是规定性的,有些则是禁止性的。当然…...
顺义公司网站建设/拉新推广渠道
2019独角兽企业重金招聘Python工程师标准>>> https://www.dusaiphoto.com/ https://github.com/stacklens/django_blog_tutorial https://www.zmrenwu.com/post/2/ http://www.spiderpy.cn/blog/ 转载于:https://my.oschina.net/attacker/blog/3007264...
品牌网站建设完善大蝌蚪/微博推广费用一般多少
在前面文章《矩阵的四个基本子空间》中提到: 一个秩为r,m*n的矩阵A中,其行空间和列空间的维数为r,零空间和左零空间的维数分别为n-r,m-r,并且有行空间与零空间正交,列空间与左零空间正交。“掌握…...
什么网站可以帮忙做任务赚钱/免费建站哪个比较好
模仿C#的StringBuilder类,还有很多函数需要慢慢完善的以前写javascript的时候,有个js的Stringbuilder类,说是效率高,不知道在php下,这样处理字符串组合时不时也会效率高呢?请高手指点<?clas…...
wordpress网页如何写/域名查询网址
在后台使用命令查看pip安装版本: pip -V 例如: ➜ log git:(master_6.0.0) ✗ pip -V pip 9.0.3 from /Users/mymac/.pyenv/versions/3.6.5/lib/python3.6/site-packages (python 3.6)...
永川做网站的公司/百度大数据平台
代码 关键字: javascript this用法小结this是JavaScript中功能最强大的关键字之一。不幸的是,如果你不知道它具体怎么工作,你将很难正确使用它。this是面向对象语言中的一个重要概念,在JAVA,C#等大型语言中,this固定指向运行时的当…...