Seata源码学习(三)-2PC核心源码解读
Seata源码分析-2PC核心源码解读
2PC提交源码流程
上节课我们分析到了GlobalTransactionalInterceptor全局事务拦截器,一旦执行拦截器,我们就会进入到其中的invoke方法,在这其中会做一些@GlobalTransactional注解的判断,如果有注解以后,会执行全局事务和全局锁,那么在执行全局事务的时候会调用handleGlobalTransaction全局事务处理器,这里主要是获取事务信息
Object handleGlobalTransaction(final MethodInvocation methodInvocation,final GlobalTransactional globalTrxAnno) throws Throwable {boolean succeed = true;try {return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {return methodInvocation.proceed();}// 获取事务名称,默认获取方法名public String name() {String name = globalTrxAnno.name();if (!StringUtils.isNullOrEmpty(name)) {return name;}return formatMethod(methodInvocation.getMethod());}/*** 解析GlobalTransactional注解属性,封装为对象* @return*/@Overridepublic TransactionInfo getTransactionInfo() {// reset the value of timeout// 获取超时时间,默认60秒int timeout = globalTrxAnno.timeoutMills();if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout = defaultGlobalTransactionTimeout;}// 构建事务信息对象TransactionInfo transactionInfo = new TransactionInfo();transactionInfo.setTimeOut(timeout);// 超时时间transactionInfo.setName(name()); // 事务名称transactionInfo.setPropagation(globalTrxAnno.propagation());// 事务传播transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());// 校验或占用全局锁重试间隔transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());// 校验或占用全局锁重试次数Set<RollbackRule> rollbackRules = new LinkedHashSet<>();// 其他构建信息for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : globalTrxAnno.rollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));}for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : globalTrxAnno.noRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {// 执行异常TransactionalExecutor.Code code = e.getCode();switch (code) {case RollbackDone:throw e.getOriginalException();case BeginFailure:succeed = false;failureHandler.onBeginFailure(e.getTransaction(), e.getCause());throw e.getCause();case CommitFailure:succeed = false;failureHandler.onCommitFailure(e.getTransaction(), e.getCause());throw e.getCause();case RollbackFailure:failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();case RollbackRetrying:failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();default:throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));}} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}
}
在这其中,我们要关注一个重点方法execute()
其实这个方法主要的作用就是,执行事务的流程,大概一下几点:
- 获取事务信息
- 开始执行全局事务
- 发生异常全局回滚,各个数据通过undo_log表进行事务补偿
- 全局事务提交
- 清除所有资源
这个位置是非常核心的一个位置,因为我们所有的业务进来以后都会走这个位置。
这其中的第三步和第四步就是在想TC(Seata-Server)发起全局事务的提交/回滚
public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfo// 获取事务信息TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.// 获取当前事务,主要获取XidGlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.// 根据配置的不同事务传播行为,执行不同的逻辑Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {case NOT_SUPPORTED:// If transaction is existing, suspend it.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();}// Execute without transaction and return.return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();tx = GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.// 当前没有事务,则创建一个新的事务if (tx == null) {tx = GlobalTransactionContext.createNew();}// set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,// else do nothing. Of course, the hooks will still be triggered.// 开始执行全局事务beginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 执行当前业务逻辑:// 1. 在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据// 2. 执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log表中// 3. 远程调用其他应用,远程应用接收到xid,也会注册分支事务,写入branch_table及本地undo_log表// 4. 会在lock_table表中插入全局锁数据(一个分支一条)rs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 发生异常全局回滚,各个数据通过undo_log表进行事务补偿completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.// 全局提交事务commitTransaction(tx);return rs;} finally {//5. clear// 清除所有资源resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}
}
如何发起全局事务
这个位置我们就看当前这个代码中的 beginTransaction(txInfo, tx);方法
// 想TC发起请求,这里采用了模板模式
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {triggerBeforeBegin();// 对TC发起请求tx.begin(txInfo.getTimeOut(), txInfo.getName());triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);}
}
那我们向下来看begin方法,那要注意,这里调用begin方法的是DefaultGlobalTransaction
@Override
public void begin(int timeout, String name) throws TransactionException {//判断调用者是否是TMif (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}assertXIDNull();String currentXid = RootContext.getXID();if (currentXid != null) {throw new IllegalStateException("Global transaction already exists," +" can't begin a new global transaction, currentXid = " + currentXid);}// 获取Xidxid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}
}
在向下来看begin方法,这时候使用的是(默认事务管理者)DefaultTransactionManager.begin,来真正的获取xid,其中就是传入事务的相关信息,最终TC端返回对应的全局事务Xid。
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);// 发送请求得到响应GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}//返回Xidreturn response.getXid();
}
这里采用的是Netty的通讯方式
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {// 通过Netty发送请求return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException toe) {throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);}
}
图解地址:https://www.processon.com/view/link/6213d58f1e0853078013c58f
相关文章:
Seata源码学习(三)-2PC核心源码解读
Seata源码分析-2PC核心源码解读 2PC提交源码流程 上节课我们分析到了GlobalTransactionalInterceptor全局事务拦截器,一旦执行拦截器,我们就会进入到其中的invoke方法,在这其中会做一些GlobalTransactional注解的判断,如果有注解…...
IO流概述
🏡个人主页 : 守夜人st 🚀系列专栏:Java …持续更新中敬请关注… 🙉博主简介:软件工程专业,在校学生,写博客是为了总结回顾一些所学知识点 目录IO流概述IO 流的分类总结流的四大类字…...
【node.js】node.js的安装和配置
文章目录前言下载和安装Path环境变量测试推荐插件总结前言 Node.js是一个在服务器端可以解析和执行JavaScript代码的运行环境,也可以说是一个运行时平台,仍然使用JavaScript作为开发语言,但是提供了一些功能性的API。 下载和安装 Node.js的官…...
Python优化算法—遗传算法
Python优化算法—遗传算法一、前言二、安装三、遗传算法3.1 自定义函数3.2 遗传算法进行整数规划3.3 遗传算法用于旅行商问题3.4 使用遗传算法进行曲线拟合一、前言 优化算法,尤其是启发式的仿生智能算法在最近很火,它适用于解决管理学,运筹…...
数据埋点(Data buried point)的应用价值剖析
一、什么是数据埋点?数据埋点指在应用中特定的流程中收集一些信息,用来跟踪应用使用的状况,后续用来进一步优化产品或是提供运营的数据支撑。比如访问数(Visits),访客数(Visitor),停…...
一文弄懂硬链接、软链接、复制的区别
复制 命令:cp file1 file2 作用:实现对file1的一个拷贝。 限制:可以跨分区,文件夹有效。 效果:修改file1,对file2无影响;修改file2,对file1无影响。删除file1,对file…...
界面组件Telerik ThemeBuilder R1 2023开创应用主题研发新方式!
Telerik DevCraft包含一个完整的产品栈来构建您下一个Web、移动和桌面应用程序。它使用HTML和每个.NET平台的UI库,加快开发速度。Telerik DevCraft提供最完整的工具箱,用于构建现代和面向未来的业务应用程序,目前提供UI for ASP.NET包含一个完…...
在FederatedScope 如何查看clientserver之间的传递的参数大小(通讯量)? 对源码的探索记录
在FederatedScope 如何查看client/server之间的传递的参数大小(通讯量)? 对源码的探索记录 背景需求 想给自己的论文补一个通讯开销对比实验:需要计算出client和server之间传递的信息(例如,模型权重、embedding)总共…...
2023爱分析 · 数据科学与机器学习平台厂商全景报告 | 爱分析报告
报告编委 黄勇 爱分析合伙人&首席分析师 孟晨静 爱分析分析师 目录 1. 研究范围定义 2. 厂商全景地图 3. 市场分析与厂商评估 4. 入选厂商列表 1. 研究范围定义 研究范围 经济新常态下,如何对海量数据进行分析挖掘以支撑敏捷决策、适应市场的快…...
20230215_数据库过程_高质量发展
高质量发展 —一、运营结果 SQL_STRING:‘delete shzc.np_rec_lnpdb a where exists (select * from tbcs.v_np_rec_lnpdbbcv t where a.telnumt.telnum and a.outcarriert.OUTCARRIER and a.incarriert.INCARRIER and a.owncarriert.OWNCARRIER and a.starttimet.STARTTIME …...
【百度 JavaScript API v3.0】LocalSearch 位置检索、Autocomplete 结果提示
地名检索移动到指定坐标 需求 在输入框中搜索,在下拉列表中浮动,右侧出现高亮的列表集。选中之后移动到指定坐标。 技术点 官网地址: JavaScript API - 快速入门 | 百度地图API SDK 开发文档:百度地图JSAPI 3.0类参考 实现 …...
运用Facebook投放,如何制定有效的竞价策略?
广告投放中,我们经常会遇到一个问题,就是不知道什么样的广告适合自己的业务。其实,最简单的方法就是根据我们业务本身进行定位并进行投放。当你了解了广告主所处行业及目标受众后,接下来会针对目标市场进行搜索和定位(…...
大数据框架之Hadoop:HDFS(五)NameNode和SecondaryNameNode(面试开发重点)
5.1NN和2NN工作机制 5.1.1思考:NameNode中的元数据是存储在哪里的? 首先,我们做个假设,如果存储在NameNode节点的磁盘中,因为经常需要进行随机访问,还有响应客户请求,必然是效率过低。因此&am…...
计算机网络 - 1. 体系结构
目录概念、功能、组成、分类概念功能组成分类分层结构概念总结OSI 七层模型应用层表示层会话层传输层网络层数据链路层物理层TCP/IP 四层模型OSI 与 TCP/IP 相同点OSI 与 TCP/IP 不同点为什么 TCP/IP 去除了表示层和会话层五层参考模型概念、功能、组成、分类 概念 …...
银行业上云进行时,OLAP 云服务如何解决传统数仓之痛?
本文节选自《中国金融科技发展概览:创新与应用前沿》,从某国有大行构建大数据云平台的实践出发,解读了 OLAP 云服务如何助力银行实现技术平台化、组件化和云服务化,降低技术应用门槛,赋能业务创新。此外,本…...
特定领域知识图谱融合方案:文本匹配算法之预训练Simbert、ERNIE-Gram单塔模型等诸多模型【三】
特定领域知识图谱融合方案:文本匹配算法之预训练模型SimBert、ERNIE-Gram 文本匹配任务在自然语言处理中是非常重要的基础任务之一,一般研究两段文本之间的关系。有很多应用场景;如信息检索、问答系统、智能对话、文本鉴别、智能推荐、文本数据去重、文本相似度计算、自然语…...
【2023最新教程】从0到1开发自动化测试框架(0基础也能看懂)
一、序言 随着项目版本的快速迭代、APP测试有以下几个特点: 首先,功能点多且细,测试工作量大,容易遗漏;其次,代码模块常改动,回归测试很频繁,测试重复低效;最后&#x…...
linux备份命令小记 —— 筑梦之路
Linux dump命令用于备份文件系统。 dump为备份工具程序,可将目录或整个文件系统备份至指定的设备,或备份成一个大文件。 dump命令只可以备份ext2/3/4格式的文件系统, centos7默认未安装dump命令,可以使用yum install -y dump安…...
vue项目(vue-cli)配置环境变量和打包时区分开发、测试、生产环境
1.打包时区分不同环境在自定义配置Vue-cli 的过程中,想分别通过.env.development .env.test .env.production 来代表开发、测试、生产环境。NODE_ENVdevelopment NODE_ENVtest NODE_ENVproduction本来想使用上面三种配置来区分三个环境,但是发现使用test…...
Python 命名规范
Python 命名规范 基本规范 类型公有内部备注Packagepackage_namenone全小写下划线式驼峰Modulemodule_name_module_name全小写下划线式驼峰ClassClassName_ClassName首字母大写式驼峰Methodmethod_nameprotected: _method_name private: __method_name全小写下划线式驼峰Exce…...
CTF show Web 红包题第六弹
提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了 先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...
Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...
探索Selenium:自动化测试的神奇钥匙
目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...
数据库正常,但后端收不到数据原因及解决
从代码和日志来看,后端SQL查询确实返回了数据,但最终user对象却为null。这表明查询结果没有正确映射到User对象上。 在前后端分离,并且ai辅助开发的时候,很容易出现前后端变量名不一致情况,还不报错,只是单…...
