如何解决微服务下引起的 分布式事务问题
一、什么是分布式事务?
虽然叫分布式事务,但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下,产生的事务,也就是分布式事务。
-
跨数据源的分布式事务
-
跨服务的分布式事务
二、解决方案
1、使用阿里开源的Seata框架解决分布式事务
1)seata的架构
Seata事务管理中有三个重要的角色:
-
TC (Transaction Coordinator) - **事务协调者:**维护全局和分支事务的状态,协调全局事务提交或回滚。
-
TM (Transaction Manager) - **事务管理器:**定义全局事务的范围、开始全局事务、提交或回滚全局事务。
-
RM (Resource Manager) - **资源管理器:**管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
3)Seata的常用模式使用-
XA模式
在一阶段各个本地事务执行完成后,不提交,把执行状态给事务协调者TC,此时本地事务继续持有数据库锁
二阶段TC基于一阶段的报告来进行判断,如果一阶段均成功则通知所有的事务参与者,提交事务,如果一阶段任意一个参与者失败,则通知所有事务参与者回滚事务。
优点:能够实现强一致性,满足ACID原则;实现简单
缺点:性能较差;依赖数据库的事务
I. 在application.yml文件中开启XA模式(所有参与事务的服务都需要设置):
seata:data-source-proxy-mode: XA
II. 在全局事务的入口方法添加@GlobalTransactional注解
@Override@GlobalTransactionalpublic Long create(Order order) {// 创建订单orderMapper.insert(order);try {// 扣用户余额accountClient.deduct(order.getUserId(), order.getMoney());// 扣库存storageClient.deduct(order.getCommodityCode(), order.getCount());} catch (FeignException e) {log.error("下单失败,原因:{}", e.contentUTF8(), e);throw new RuntimeException(e.contentUTF8(), e);}return order.getId();}
-
AT模式
和xa模式一样也是二阶段提交,不同的是AT模式本地事务结束后,直接提交。但是,它会在本地事务进行数据库数据更新的时候记录一下更新前后的快照。
-
在二阶段需要回滚的时候,根据快照进行数据的恢复,如果二阶段全局事务提交,则把记录的快照删除。
优点:性能好;实现也较为简单
缺点: 存在中间状态,只能达到最终的一致性;快照功能会影响一些性能,但是相对于XA模式还是要好很多
I. 在application.yml文件中开启AT模式(所有参与事务的服务都需要设置):
seata:data-source-proxy-mode: AT # 默认就是AT
II. 创建相关数据库表
#在分支事务所在的库里创建记录快照的表undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` longblob NOT NULL COMMENT 'rollback info',`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` datetime(6) NOT NULL COMMENT 'create datetime',`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;#在TC服务所使用的库里创建全局锁记录表lock_table
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (`row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`transaction_id` bigint(20) NULL DEFAULT NULL,`branch_id` bigint(20) NOT NULL,`resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`gmt_create` datetime NULL DEFAULT NULL,`gmt_modified` datetime NULL DEFAULT NULL,PRIMARY KEY (`row_key`) USING BTREE,INDEX `idx_branch_id`(`branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
III. 在全局事务的入口方法添加@GlobalTransactional注解
@Override@GlobalTransactionalpublic Long create(Order order) {// 创建订单orderMapper.insert(order);try {// 扣用户余额accountClient.deduct(order.getUserId(), order.getMoney());// 扣库存storageClient.deduct(order.getCommodityCode(), order.getCount());} catch (FeignException e) {log.error("下单失败,原因:{}", e.contentUTF8(), e);throw new RuntimeException(e.contentUTF8(), e);}return order.getId();}
2、使用RocketMQ实现可靠消息最终一致性方案 (适用于不同项目的情况)
模拟转账 a银行向b银行转账
a银行业务代码:
减少金额,像mq发送事务消息
- 引入rocketmq依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq‐spring‐boot‐starter</artifactId><version>2.0.2</version>
</dependency>
2)配置rocketmq
rocketmq.producer.group = zhuoye #设置生产者组的名称
rocketmq.name‐server = 127.0.0.1:9876 #指定rocketmq的地址
3) 业务层代码
@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//向mq发送转账消息@Overridepublic void sendTransferAccountsMessagesToMq(AccountChangeEvent accountChangeEvent) {//将accountChangeEvent转成jsonJSONObject jsonObject =new JSONObject();jsonObject.put("transferAccountInfo",accountChangeEvent);String jsonString = jsonObject.toJSONString();//生成message类型Message<String> message = MessageBuilder.withPayload(jsonString).build();//发送一条事务消息/*** String txProducerGroup 生产组* String destination topic,* Message<?> message, 消息内容* Object arg 参数*/ rocketMQTemplate.sendMessageInTransaction("transferAccount_ABank","topic_transferAccount",message,null);}//更新账户,扣减金额@Override@Transactionalpublic void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//幂等判断if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){return ;}//扣减金额userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);//添加事务日志tansactionalRecordMapper.add(accountChangeEvent.getTxNo());}
4)编写RocketMQLocalTransactionListener接口实现类
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "transferAccount_ABank")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {@Autowiredprivate UserAccountService userAccountService;@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("transferAccountInfo");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//执行本地事务,扣减金额userAccountService.doUpdateAccountBalance(accountChangeEvent);//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}//事务状态回查,查询是否扣减金额@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("transferAccountInfo");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//事务idString txNo = accountChangeEvent.getTxNo();int isExist = tansactionalRecordMapper.isExist(txNo);if(isExist>0){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;}}
}
b银行业务代码(前两步一样):
接收消息,增加金额
1)业务层代码
@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//更新账户,增加金额@Override@Transactionalpublic void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {//已更新if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){return ;}//增加金额userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());//添加事务记录,用于幂等tansactionalRecordMapper.add(accountChangeEvent.getTxNo());}
}
2)监听事务消息
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "transferAccount_ABank",topic = "topic_transferAccount",maxReconsumeTimes = 3)
public class TxmsgConsumer implements RocketMQListener<String> {@AutowiredUserAccountService userAccountService;//接收消息@Overridepublic void onMessage(String message) {//解析消息JSONObject jsonObject = JSONObject.parseObject(message);String accountChangeString = jsonObject.getString("transferAccountInfo");//转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//更新本地账户,增加金额userAccountService.addAccountInfoBalance(accountChangeEvent);}
}
相关文章:
如何解决微服务下引起的 分布式事务问题
一、什么是分布式事务? 虽然叫分布式事务,但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下,产生的事务,也就是分布式事务。 跨数据源的分布式事务 跨服务的分布式事务 二、解决方…...
牛客周赛50轮+cf955+abc363
D-小红的因式分解_牛客周赛 Round 50 (nowcoder.com) 思路: 巨蠢的题目,ax^2bxca1*a2*x^2(b1*a2b2*a1)xb1*b2,即: aa1*a2,ba1*b2a2*b1,cb1*b2 数据范围很小,直接暴力枚举吧(注意条件) 代码…...
【MySQL】:对库和表的基本操作方法
数据库使用的介绍 什么是SQL 学习数据库的使用——>基于 SQL编程语言 来对数据库进行操作 重点表述的是“需求”,期望得到什么结果。(至于结果是如何得到的,并不关键,都是数据库服务器在背后做好了) 重点表述的是…...
Library not found for -lstdc++.6.0.9
解决方案一 由于项目已经很多年了,前段时间更新了Xcode发现编译报错lstdc这个库很早以前就被舍弃了,但是一个项目的维护都随着解决bug堆砌出来的,这也导致了我们的项目走上了这条路。 比如 Library not found for -lstdc.6.0.9 报的错&#x…...
防火墙之双机热备篇
为什么要在防火墙上配置双机热备技术呢? 相信大家都知道,为了提高可靠性,避免单点故障 肯定有聪明的小伙伴会想到那为什么不直接多配置两台防火墙,然后再将他们进行线路冗余,不就完成备份了吗? 答案是不…...
终端里面ifconfig命令无法运行
在 Ubuntu 以及基于 Debian 的系统中,ifconfig 命令可能不会默认安装,因为自 Ubuntu 17.10 版本开始,系统默认使用 ip 命令作为网络配置的主要工具,而 ifconfig 命令则来自 net-tools 包,该包不再作为标准工具被包含在…...
掌握Python中的文件序列化:Json和Pickle模块解析
Python 文件操作与管理:Open函数、Json与Pickle、Os模块 在Python中,文件是一个重要的数据处理对象。无论是读取数据、保存数据还是进行数据处理,文件操作都是Python编程中不可或缺的一部分。本文将详细介绍Python中文件操作的几种常用方法&…...
WordPress 6.6 “Dorsey多尔西”发布
WordPress 6.6 “Dorsey多尔西”已经发布,它以传奇的美国大乐队领袖 Tommy Dorsey 名字命名。Dorsey 以其音调流畅的长号和作品而闻名,他的音乐以其情感深度和充满活力的能量吸引了观众。 当您探索 WordPress 6.6 的新功能和增强功能时,让您的…...
核函数支持向量机(Kernel SVM)
核函数支持向量机(Kernel SVM)是一种非常强大的分类器,能够在非线性数据集上实现良好的分类效果。以下是关于核函数支持向量机的详细数学模型理论知识推导、实施步骤与参数解读,以及两个多维数据实例(一个未优化模型&a…...
二分查找(折半查找)
这次不排序了,对排好序的数组做个查找吧 介绍 二分查找排序英文名为BinarySort,是一种效率较高的查找方法要求线性表必须采用顺序存储结构 基本思路 通过不断地将搜索范围缩小一半来找到目标元素: 1、假定数组为arr,需要查找的…...
arcgis紧凑型切片缓存(解决大范围切片,文件数量大的问题)
ArcGIS 切片缓存的紧凑型存储格式是一种优化的存储方式,用于提高切片缓存的存储效率和访问速度。紧凑型存储格式将多个切片文件合并为一个单一的 .bundle 文件,从而减少文件系统的开销和切片的加载时间。这类格式已经应用很久了,我记得2013我…...
ESP32CAM人工智能教学15
ESP32CAM人工智能教学15 Flask服务器TCP连接 小智利用Flask在计算机中创建一个虚拟的网页服务器服务器,让ESP32Cam通过WiFi连接,把摄像头拍摄到的图片发送到电脑中,并在电脑中保存成图片文件。 Flask是用Python编写的网页服务程序WebServer。…...
Pandas 33个冷知识 0721
Pandas 33个冷知识 从Excel读取数据: 使用 pd.read_excel(file.xlsx) 来读取Excel文件。 写入Excel: 使用 df.to_excel(file.xlsx, indexFalse) 将DataFrame写入Excel文件。 创建日期索引: 使用 df.set_index(pd.to_datetime(df[date])) 创建日期索引。 向后填充缺失值: 使用…...
C++ map和set的使用
目录 0.前言 1.关联式容器 2.键值对 3.树形结构的关联式容器 3.1树形结构的特点 3.2树形结构在关联式容器中的应用 4.set 4.1概念与性质 4.2使用 5.multiset 5.1概念与性质 5.2使用 6.map 6.1概念与性质 6.2使用 7.multimap 7.1概念与性质 7.2使用 8.小结 &a…...
yarn的安装和配置以及更新总结,npm的对照使用差异
1. Yarn简介 Yarn 是一个由 Facebook 开发的现代 JavaScript 包管理器,旨在提供更快、更安全、更可靠的包管理体验。 1.1 什么是Yarn Yarn 是一个快速、可靠和安全的 JavaScript 包管理器,它通过并行化操作和智能缓存机制,显著提升了依赖安…...
【Git命令】git rebase之合并提交记录
使用场景 在本地提交了两个commit,但是发现根本没有没必要分为两次,需要想办法把两次提交合并成一个提交;这个时候可以使用如下命令启动交互式变基会话: git rebase -i HEAD~N这里 N 是你想要重新调整的最近的提交数。 如下在本地…...
为什么品牌需要做 IP 形象?
品牌做IP形象的原因有多方面,这些原因共同构成了IP形象在品牌建设中的重要性和价值,主要原因有以下几个方面: 增强品牌识别度与记忆点: IP形象作为品牌的视觉符号,具有独特性和辨识性,能够在消费者心中留…...
Kubernetes 1.24 版弃用 Dockershim 后如何迁移到 containerd 和 CRI-O
在本系列的上一篇文章中,我们讨论了什么是 CRI 和 OCI,Docker、containerd、CRI-O 之间的区别以及它们的架构等。最近,我们得知 Docker 即将从 kubernetes 中弃用!(查看 kubernetes 官方的这篇文章)那么让我…...
70. 爬楼梯【 力扣(LeetCode) 】
一、题目描述 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢? 二、测试用例 示例 1: 输入:n 2 输出:2 解释:有两种方法可以爬到楼顶。 1. 1 阶…...
R语言优雅的把数据基线表(表一)导出到word
基线表(Baseline Table)是医学研究中常用的一种数据表格,用于在研究开始时呈现参与者的初始特征和状态。这些特征通常包括人口统计学数据、健康状况和疾病史、临床指标、实验室检测、生活方式、社会经济等。 本人在既往文章《scitb包1.6版本发…...
XMl基本操作
引言 使⽤Mybatis的注解⽅式,主要是来完成⼀些简单的增删改查功能. 如果需要实现复杂的SQL功能,建议使⽤XML来配置映射语句,也就是将SQL语句写在XML配置⽂件中. 之前,我们学习了,用注解的方式来实现MyBatis 接下来我们…...
Linux——Shell脚本和Nginx反向代理服务器
1. Linux中的shell脚本【了解】 1.1 什么是shell Shell是一个用C语言编写的程序,它是用户使用Linux的桥梁 Shell 既是一种命令语言,有是一种程序设计语言 Shell是指一种应用程序,这个应用程序提供了一个界面,用户通过这个界面访问…...
pyspark使用 graphframes创建和查询图的方法
1、安装graphframes的步骤 1.1 查看 spark 和 scala版本 在终端输入: spark-shell --version 查看spark 和scala版本 1.2 在maven库中下载对应版本的graphframes https://mvnrepository.com/artifact/graphframes/graphframes 我这里需要的是spark 2.4 scala 2.…...
【web】-flask-简单的计算题(不简单)
打开页面是这样的 初步思路,打开F12,查看头,都发现了这个表达式的base64加密字符串。编写脚本提交答案,发现不对; 无奈点开source发现源代码,是flask,初始化表达式,获取提交的表达式࿰…...
Apache Sqoop
Apache Sqoop是一个开源工具,用于在Apache Hadoop和关系型数据库(如MySQL、Oracle、PostgreSQL等)之间进行数据的批量传输。其主要功能包括: 1. 数据导入:从关系型数据库(如MySQL、Oracle等)中将…...
【Python】TensorFlow介绍与实战
TensorFlow介绍与使用 1. 前言 在人工智能领域的快速发展中,深度学习框架的选择至关重要。TensorFlow 以其灵活性和强大的社区支持,成为了许多研究者和开发者的首选。本文将进一步扩展对 TensorFlow 的介绍,包括其优势、应用场景以及在最新…...
第100+16步 ChatGPT学习:R实现Xgboost分类
基于R 4.2.2版本演示 一、写在前面 有不少大佬问做机器学习分类能不能用R语言,不想学Python咯。 答曰:可!用GPT或者Kimi转一下就得了呗。 加上最近也没啥内容写了,就帮各位搬运一下吧。 二、R代码实现Xgboost分类 (…...
【操作系统】定时器(Timer)的实现
这里写目录标题 定时器一、定时器是什么二、标准库中的定时器三、实现定时器 定时器 一、定时器是什么 定时器也是软件开发中的⼀个重要组件.类似于⼀个"闹钟".达到⼀个设定的时间之后,就执行某个指定 好的代码. 定时器是⼀种实际开发中⾮常常用的组件. ⽐如⽹络通…...
鸿蒙Navigation路由能力汇总
基本使用步骤: 1、新增配置文件router_map: 2、在moudle.json5中添加刚才新增的router_map配置: 3、使用方法: 属性汇总: https://developer.huawei.com/consumer/cn/doc/harmonyos-references/ts-basic-compone…...
1:1公有云能力整体输出,腾讯云“七剑”下云端
【全球云观察 | 科技热点关注】 曾几何时,云计算技术的兴起,为千行万业的数字化创新带来了诸多新机遇,同时也催生了新产业新业态新模式,激发出高质量发展的科技新动能。很显然,如今的云创新已成为高质量发…...
沧浪网站建设/手机百度网盘下载慢怎么解决
本文实例讲述了python实现爬虫抓取小说功能。分享给大家供大家参考,具体如下: # -*- coding: utf-8 -*- from bs4 import BeautifulSoup from urllib import request import re import os,time #访问url,返回html页面 def get_html(url): req…...
wordpress添加用户/宣传推广
题意大概是这样,给你一个字符串,你可以进行的操作是这样的, 每次拿走这个串的第一个字母,或者最后一个字母,然后放到 一个新串的末尾(当然啦,新串一开始是为空的),当把旧…...
一个网站如何做推广/seo优化入门教程
从MySQL5.6开始,mysqlbinlog支持将远程服务器上的binlog实时复制到本地服务器上。mysqlbinlog的实时二进制复制功能并非简单的将远程服务器的日志复制过来,它是通过MySQL 5.6公布的Replication API实时获取二进制事件。本质上,就相当于MySQL的…...
web开发工具下载/佛山网站优化
关注头条号,私信回复资料会有意外惊喜呦………………最后一张照片有资料。不管是参加Kaggle比赛,还是开发一个深度学习应用,第一步总是数据分析。这篇文章介绍了8个使用Python进行数据分析的方法,不仅能够提升运行效率,…...
app服务器搭建教程/长春百度seo公司
上来就是正在安装,一点选择的机会都没有 解决方案: 点击更多系统和语言下载 自行选择下载的类型,这个有选择方案...
免备案空间网站/地方网站建设
首先在oracle中没有datediff()函数可以用以下方法在oracle中实现该函数的功能:1.利用日期间的加减运算天:ROUND(TO_NUMBER(END_DATE - START_DATE))小时:ROUND(TO_NUMBER(END_DATE - START_DATE) * 24)分钟:ROUND(TO_NUMBER(END_DATE - START…...