网站建设企业资质等级/列举常见的网络营销工具
Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源事务,分布式事务
文章目录
- Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源事务,分布式事务
- 0.前言
- 1. 基础介绍
- ConnectionFactory
- AbstractRoutingDataSource 动态路由数据源的抽象类
- DynamicLocalTransactionInterceptor 动态的本地事务拦截器
- 3. 使用步骤示例
- 4. 官方源码分析
- 5. 参考资料
0.前言
背景
处理多数据源事务一直是一个复杂而棘手的问题,通常我们有两种主流的解决方法。
第一种是通过Atomikos手动创建多数据源事务,这种方法更适合数据源数量较少,参数配置不复杂,对性能要求不高的项目。然而,这种方法的最大困难在于需要手动配置大量设置,这可能会消耗大量时间。
第二种是通过使用Seata等分布式事务解决方案。这种方法的难点在于需要建立并维护像Seata-server这样的统一管理中心。
今天我们使用Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源 实现分布式事务和本地多数据源事务。
每种解决方案都有其适用的场景,然而在实际操作中,我经常接到如下的问题:
“我为什么在添加了事务注解之后,数据源切换还是失败了?”
“我了解到这涉及到分布式事务,但我并不想使用Seata。我的场景比较简单,有没有不需要依赖第三方的解决方案?”
这些问题突显出在现实工作中,我们可能需要更灵活、更简便的解决方案来处理多数据源事务问题。
1. 基础介绍
自从3.3.0
开始,由seata
的核心贡献者https://github.com/a364176773 贡献了基于connection
代理的方案。
完整代码 https://github.com/baomidou/dynamic-datasource-spring-boot-starter/commit/f0cbad193528296eeb64faa76c79743afbdd811d
建议从3.4.0
版本开始使用,其修复了一个功能,老版本不加@DS
只加@DSTransactional
会报错。
核心的几处代码
@Role(value = BeanDefinition.ROLE_INFRASTRUCTURE)@ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX, name = "seata", havingValue = "false",matchIfMissing = true)@Beanpublic Advisor localTransactionAdvisor() {AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();pointcut.setExpression("@annotation(com.baomidou.dynamic.datasource.annotation.DSTransactional)");return new DefaultPointcutAdvisor(pointcut, new DynamicTransactionAdvisor());}
我们可以看到通过spring.datasource.dynamic.seata=true
配置来启用条件注解。这个是dynamic-datasource
支持seata事务的开发和入口。
ConnectionFactory
ConnectionFactory
是一个工厂类,主要的作用是管理数据库连接,并提供获取和存储数据库连接的功能。
-
存储每个线程独立的数据库连接:
ConnectionFactory
使用ThreadLocal
为每个线程提供其自己的数据库连接池,这样可以防止在多线程环境中数据库连接的混乱。 -
提供获取数据库连接的方法:
ConnectionFactory
提供getConnection
方法,使得在同一个线程中的多个模块可以共享同一个数据库连接。 -
提供存储数据库连接的方法:
ConnectionFactory
提供putConnection
方法,可以存储新的数据库连接到当前线程的数据库连接池中。 -
提供通知数据库连接的方法:
ConnectionFactory
提供notify
方法,可以对当前线程的所有数据库连接进行统一的操作,比如提交或者回滚事务。
通过这些功能,ConnectionFactory
实现了数据库连接的有效管理,保证了在同一线程中对多个数据库进行操作时,可以共享同一连接,实现事务管理。核心代码如下。大家可以借鉴
package com.baomidou.dynamic.datasource.tx;import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author funkye*/
public class ConnectionFactory {// 使用ThreadLocal来保存与当前线程相关的数据库连接信息,以Map形式存储,Map中的key为数据源名称,value为对应的数据库连接代理类private static final ThreadLocal<Map<String, ConnectionProxy>> CONNECTION_HOLDER =new ThreadLocal<Map<String, ConnectionProxy>>() {@Overrideprotected Map<String, ConnectionProxy> initialValue() {return new ConcurrentHashMap<>(8);}};// 存储数据库连接到当前线程的连接池中,如果当前线程的连接池中没有该数据源的连接,则新建一个并放入public static void putConnection(String ds, ConnectionProxy connection) {Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();if (!concurrentHashMap.containsKey(ds)) {try {connection.setAutoCommit(false);} catch (SQLException e) {e.printStackTrace();}concurrentHashMap.put(ds, connection);}}// 从当前线程的连接池中获取指定数据源的数据库连接public static ConnectionProxy getConnection(String ds) {return CONNECTION_HOLDER.get().get(ds);}// 对当前线程的所有数据库连接执行通知操作,根据参数state决定是提交还是回滚,如果在执行过程中发生错误,则在所有连接处理完后抛出public static void notify(Boolean state) throws Exception {Exception exception = null;try {Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {try {connectionProxy.notify(state);} catch (SQLException e) {exception = e;}}} finally {CONNECTION_HOLDER.remove(); //清除当前线程的连接池if (exception != null) {throw exception;}}}}
AbstractRoutingDataSource 动态路由数据源的抽象类
动态路由数据源的抽象类,用于根据不同的业务需要,动态地选择需要使用的数据源。关键的方法是getConnection()
和getConnection(String username, String password)
,这两个方法会根据当前是否存在全局事务来动态地选择获取原始的数据库连接还是数据库连接代理。
public abstract class AbstractRoutingDataSource extends AbstractDataSource {// 抽象方法,子类需要实现该方法以确定数据源protected abstract DataSource determineDataSource();// 抽象方法,子类需要实现该方法以确定默认的数据源名称protected abstract String getPrimary();// 获取数据库连接,根据事务上下文中是否有XID来判断是否需要获取代理连接@Overridepublic Connection getConnection() throws SQLException {String xid = TransactionContext.getXID();if (StringUtils.isEmpty(xid)) {// 如果没有XID,说明当前不处于全局事务中,直接获取原始连接return determineDataSource().getConnection();} else {// 如果有XID,说明当前处于全局事务中,需要获取代理连接String ds = DynamicDataSourceContextHolder.peek();ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;ConnectionProxy connection = ConnectionFactory.getConnection(ds);return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;}}// 与上面的方法类似,只不过这个方法可以传入用户名和密码来获取数据库连接@Overridepublic Connection getConnection(String username, String password) throws SQLException {String xid = TransactionContext.getXID();if (StringUtils.isEmpty(xid)) {return determineDataSource().getConnection(username, password);} else {String ds = DynamicDataSourceContextHolder.peek();ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;ConnectionProxy connection = ConnectionFactory.getConnection(ds);return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection(username, password)): connection;}}// 创建数据库连接代理,并将代理连接放入连接工厂private Connection getConnectionProxy(String ds, Connection connection) {ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds);ConnectionFactory.putConnection(ds, connectionProxy);return connectionProxy;}// 获取指定类型的代理对象@Override@SuppressWarnings("unchecked")public <T> T unwrap(Class<T> iface) throws SQLException {if (iface.isInstance(this)) {return (T) this;}return determineDataSource().unwrap(iface);}// 判断是否是指定类型的代理对象@Overridepublic boolean isWrapperFor(Class<?> iface) throws SQLException {return (iface.isInstance(this) || determineDataSource().isWrapperFor(iface));}
}
DynamicLocalTransactionInterceptor 动态的本地事务拦截器
动态的本地事务拦截器。基本思想是在方法调用前后添加事务处理的逻辑。当这个拦截器被应用到某个方法时,那么在调用这个方法时,会首先检查当前是否已经存在事务,如果存在则直接调用原始方法。如果不存在,则会先开启一个新的事务,然后调用原始方法,方法结束后根据方法执行的结果来提交或回滚事务。入口在这,看一眼就懂了。
// 实现MethodInterceptor接口定义拦截器
public class DynamicLocalTransactionInterceptor implements MethodInterceptor {@Override// invoke方法会在原方法执行前后进行拦截public Object invoke(MethodInvocation methodInvocation) throws Throwable {// 如果当前上下文中已存在事务,则直接调用原方法,不进行拦截处理if (!StringUtils.isEmpty(TransactionContext.getXID())) {return methodInvocation.proceed();}// 定义一个状态标志,标记事务是否执行成功boolean state = true;Object o;// 开启一个新的事务LocalTxUtil.startTransaction();try {// 调用原始方法o = methodInvocation.proceed();} catch (Exception e) {// 如果原方法执行抛出异常,则标记事务执行失败state = false;throw e;} finally {// 根据事务执行状态,提交或回滚事务if (state) {LocalTxUtil.commit();} else {LocalTxUtil.rollback();}}// 返回原方法的执行结果return o;}
}
3. 使用步骤示例
官方示例:https://github.com/dynamic-datasource/dynamic-datasource-samples/tree/master/tx-samples/tx-local-sample
完整示例项目 数据库都已准备好,可以直接运行测试。http://localhost:8080/doc.html
示例项目A,B,C分别对应OrderService,ProductService,AccountService。分别是独立的数据库。
用户下单分别调用产品库扣库存,账户库扣余额。
如果库存不足,或用户余额不足都抛出RuntimeException,触发整体回滚。
@Slf4j
@Service
@AllArgsConstructor
public class OrderService {private final OrderMapper orderMapper;private final AccountService accountService;private final ProductService productService;//@DS("order") 这里不需要,因为order是默认库,如果开启事务的不是默认库则必须加@DSTransactional //注意这里开启事务public void placeOrder(PlaceOrderRequest request) {log.info("=============ORDER START=================");Long userId = request.getUserId();Long productId = request.getProductId();Integer amount = request.getAmount();log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);log.info("当前 XID: {}", TransactionContext.getXID());Order order = Order.builder().userId(userId).productId(productId).status(OrderStatus.INIT).amount(amount).build();orderMapper.insert(order);log.info("订单一阶段生成,等待扣库存付款中");// 扣减库存并计算总价Double totalPrice = productService.reduceStock(productId, amount);// 扣减余额accountService.reduceBalance(userId, totalPrice);order.setStatus(OrderStatus.SUCCESS);order.setTotalPrice(totalPrice);orderMapper.updateById(order);log.info("订单已成功下单");log.info("=============ORDER END=================");}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class ProductService {private final ProductMapper productMapper;@DS("product")public Double reduceStock(Long productId, Integer amount) {log.info("=============PRODUCT START=================");log.info("当前 XID: {}", TransactionContext.getXID());// 检查库存Product product = productMapper.selectById(productId);Assert.notNull(product, "商品不存在");Integer stock = product.getStock();log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);if (stock < amount) {log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);throw new RuntimeException("库存不足");}log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());// 扣减库存int currentStock = stock - amount;product.setStock(currentStock);productMapper.updateById(product);double totalPrice = product.getPrice() * amount;log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);log.info("=============PRODUCT END=================");return totalPrice;}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class AccountService {private final AccountMapper accountMapper;@DS("account")public void reduceBalance(Long userId, Double price) {log.info("=============ACCOUNT START=================");log.info("当前 XID: {}", TransactionContext.getXID());Account account = accountMapper.selectById(userId);Assert.notNull(account, "用户不存在");Double balance = account.getBalance();log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);if (balance < price) {log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);throw new RuntimeException("余额不足");}log.info("开始扣减用户 {} 余额", userId);double currentBalance = account.getBalance() - price;account.setBalance(currentBalance);accountMapper.updateById(account);log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);log.info("=============ACCOUNT END=================");}
}
4. 官方源码分析
5. 参考资料
- dynamic-datasource GitHub 仓库 ↗:dynamic-datasource 的官方 GitHub 仓库,包含源代码、文档和示例等资源。
相关文章:

Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源 分布式事务
Springbootmybatis-plusdynamic-datasourceDruid 多数据源事务,分布式事务 文章目录 Springbootmybatis-plusdynamic-datasourceDruid 多数据源事务,分布式事务0.前言1. 基础介绍ConnectionFactoryAbstractRoutingDataSource 动态路由数据源的抽象类 Dyn…...

673. 最长递增子序列的个数
673. 最长递增子序列的个数 原题链接:完成情况:解题思路:方法一:动态规划方法二:贪心 前缀和 二分查找 参考代码:__673最长递增子序列的个数__动态规划__673最长递增子序列的个数__贪心_前缀和_二分查找…...

Android12之ABuffer数据处理(三十四)
简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只有行动才是治疗恐惧和懒惰的唯一良药. 更多原创,欢迎关注:Android…...

whisper 语音识别项目部署
1.安装anaconda软件 在如下网盘免费获取软件: 链接:https://pan.baidu.com/s/1zOZCQOeiDhx6ebHh5zNasA 提取码:hfnd 2.使用conda命令创建python3.8环境 conda create -n whisper python3.83.进入whisper虚拟环境 conda activate whisper4.…...

实例044 在关闭窗口前加入确认对话框
实例说明 用户对程序进行操作时,难免会有错误操作的情况,例如不小心关闭程序,如果尚有许多资料没有保存,那么损失将非常严重,所以最好使程序具有灵活的交互性。人机交互过程一般都是通过对话框来实现的,对话…...

子查询和事务隔离以及用户管理
一、子查询 子查询是另一个语句中的select语句嵌套在另一个select中。注意子查询语法上必须使用()包起来。 嵌套的那个语句返回的结果有可能是: 一个字段,一行记录,一个列或一个表。嵌套的位置 where / having语句里面作为条件使用在from语…...

uniapp 滚动到指定元素的位置(锚点)
需求:在页面中,不管位于何处,点击按钮页面滚动到对应的标题位置。 最简单有效的方式(直接复制改数据就行) 使用 scroll-view 标签的属性:scroll-top(距离值 num) 或 scroll-into-view(子元素的id,不能以…...

Spring AOP 的 afterReturing 返回值是否能修改问题
文章目录 结论举例子原因外传 结论 最近要搞脱敏信息,所以,想了几种方案,最后使用全局的接口拦截,但是,又不能用注解的方式,毕竟是几年的老产品,有很多限制。 中间尝试过使用Spring AOP 的 aft…...

MyBatis分页插件PageHelper的使用及特殊字符的处理
目录 一、PageHelper简介 1.什么是分页 2.PageHelper是什么 3.使用PageHelper的优点 二、PageHelper插件的使用 原生limit查询 1. 导入pom依赖 2. Mybatis.cfg.xml 配置拦截器 3. 使用PageHelper进行分页 三、特殊字符的处理 1.SQL注入: 2.XML转义&#…...

[语音识别] 基于Python构建简易的音频录制与语音识别应用
语音识别技术的快速发展为实现更多智能化应用提供了无限可能。本文旨在介绍一个基于Python实现的简易音频录制与语音识别应用。文章简要介绍相关技术的应用,重点放在音频录制方面,而语音识别则关注于调用相关的语音识别库。本文将首先概述一些音频基础概…...

Matlab彩色图像转索引图像
索引图像 索引图像是一种把像素值直接作为RGB调色板下标的图像。索引图像包括一个数据矩阵X,一个调色板矩阵map,也称为颜色映像矩阵。其中,数据矩阵X可以是8位无符号整型、16位无符号整型或双精度类型。调色板矩阵map是一个m3的数据阵列&…...

测试框架pytest教程(11)-pytestAPI
常量 pytest.__version__ #输出pytest版本 pytest.version_tuple #输出版本的元组形式 功能 pytest.approx pytest.approx 是一个用于进行数值近似比较的 pytest 断言工具。 在测试中,有时候需要对浮点数或其他具有小数部分的数值进行比较。然而,由于…...

Docker自学:利用FastAPI建立一个简单的web app
环境配置:下载Docker Desktop 文件一:main.py from typing import Unionfrom fastapi import FastAPIimport uvicornapp FastAPI()app.get("/") def read_root():return {"Hello": "World"}app.get("/items/{item…...

微调bert做学术论文分类(以科大讯飞学术论文分类挑战赛为例)
代码 12-How to Fine-Tune BERT for Text Classification:链接:https://pan.baidu.com/s/1EKggbyC4ZW-ufnDW45eKzA 提取码:k3b2 baseline 链接:https://pan.baidu.com/s/12hkZNJjQ__FGAHiF3fifvQ 提取码:88tb 数据…...

Springboot中sharding-jdbc的API模式并使用自定义算法
Springboot中sharding-jdbc的API模式并使用自定义算法 可配合AbstractRoutingData使用切换数据源 程序用到了AbstractRoutingData来切换数据源(数据源是自定义的格式编写并没有用springboot的自动装配的格式写),但是又用到sharding-jdbc进行…...

MySQL回表是什么?哪些情况下会回表
🏆作者简介,黑夜开发者,全栈领域新星创作者✌,CSDN博客专家,阿里云社区专家博主,2023年6月CSDN上海赛道top4。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责…...

VR、AR、MR 傻傻分不清楚?区别的底层逻辑?
VR是一种能够制作虚拟物体并与人互动的基础技术。它与操作者所处的环境无关。AR可以让在特定位置出现或消失。MR可以让虚拟物体与真实物体进行互动。 AR和MR的大部分应用场景都是随机的,所以硬件基本都采用手机和眼镜。提升了便携性。牺牲了性能。这就导致了AR与MR…...

VScode运行C语言出现的调试问题 lauch:program does not exist 解决方法
"lauch:program does not exist"错误通常表示编译器或调试器无法找到指定的可执行文件。这可能是由于几个原因引起的。首先,确保你的源代码文件夹路径不包含中文字符,因为这可能导致编译器无法识别文件。其次,检查你的launch.json文…...

云原生安全:保护现代化应用的新一代安全策略
随着云计算和容器技术的快速发展,云原生应用已成为现代化软件开发和部署的主流趋势。然而,随之而来的安全挑战也变得更加复杂和严峻。本文将深入探讨云原生安全的概念、原则和最佳实践,帮助您理解如何有效保护云原生应用和敏感数据。 第一部…...

mysql操作
1、字符转Decimal CAST(column AS DECIMAL(9,2)) 2、将计算结果取两位小数: round(column, 2) 3、查询非空 select * from table_XX where id is not null; 4、连表update更新 update a inner join (select yy from b) c on a.id c.id set a.xx c.yy...

前端(十四)——DOM节点操作手册:你需要了解的一切
🙂博主:小猫娃来啦 🙂文章核心:DOM节点操作手册:你需要了解的一切 文章目录 前言DOM基础知识操作现有节点创建新节点遍历节点树修改节点属性和样式事件处理实践应用动态创建表格动态更新列表 前言 DOM(文档…...

PDF怎么转成PPT文件免费?一个软件解决
随着科技的不断发展和进步,电子文档已经成为我们日常工作和学习中不可或缺的一部分。PDF作为一种跨平台的文件格式,以其可靠性和易读性而备受推崇。然而,在某些情况下,我们可能需要PDF怎么转成PPT文件免费,以便更好地展…...

数据结构基础:P3-树(上)----编程作业02:List Leaves
本系列文章为浙江大学陈越、何钦铭数据结构学习笔记,系列文章链接如下: 数据结构(陈越、何钦铭)学习笔记 文章目录 一、题目描述二、整体思路与实现代码 一、题目描述 题目描述: 给定一棵树,按照从上到下、从左到右的顺序列出所有…...

山西电力市场日前价格预测【2023-08-25】
日前价格预测 预测明日(2023-08-25)山西电力市场全天平均日前电价为314.22元/MWh。其中,最高日前电价为336.17元/MWh,预计出现在18: 30。最低日前电价为283.05元/MWh,预计出现在24: 00。 价差方向预测 1: 实…...

手机无人直播软件,有哪些优势?
近年来,随着手机直播的流行和直播带货的市场越来越大,手机无人直播软件成为许多商家开播带货的首选。在这个领域里,声音人无人直播系统以其独特的优势,成为市场上备受瞩目的产品。接下来,我们将探讨手机无人直播软件给…...

SpringBoot概述SpringBoot基础配置yml的使用多环境启动
🐌个人主页: 🐌 叶落闲庭 💨我的专栏:💨 c语言 数据结构 javaEE 操作系统 石可破也,而不可夺坚;丹可磨也,而不可夺赤。 SpringBoot简介 一、 SpringBoot概述1.1 起步依赖…...

Python Pandas 处理Excel数据 制图
目录 1、饼状图 2、条形统计图 1、饼状图 import pandas as pd import matplotlib.pyplot as plt import numpy as np #from matplotlib.ticker import MaxNLocator # 解决中文乱码 plt.rcParams[font.sans-serif][SimHei] plt.rcParams[font.sans-serif]Microsoft YaHei …...

如何自己实现一个丝滑的流程图绘制工具(五)bpmn的xml和json互转
背景 因为服务端给的数据并不是xml,而且服务端要拿的数据是json,所以我们只能xml和json互转,来完成和服务端的对接 xml转json import XML from ./config/jsonxml.js/*** xml转为json* param {*} xml*/xmlToJson(xml) {const xotree new X…...

mysql--数据库的操作
数据库,是数据存储的最大单元。 1 创建数据库 create database mydatabase; 每次创建数据库的时候,都会多一个文件夹,关系型数据库是存储在磁盘当中的,所以这时候可以查看新建的数据库 2 指定字符集 MySQL中的字符集转换过程 制…...

kafka--技术文档--架构体系
架构体系 Kafka的架构体系包括以下几个部分: Producer. 消息生产者,就是向Kafka broker发送消息的客户端。Broker. 一台Kafka服务器就是一个Broker。一个集群由多个Broker组成。一个Broker可以容纳多个Topic。Topic. 可以理解为一个队列,一…...