当前位置: 首页 > news >正文

Seata源码学习(三)-2PC核心源码解读

Seata源码分析-2PC核心源码解读

2PC提交源码流程

上节课我们分析到了GlobalTransactionalInterceptor全局事务拦截器,一旦执行拦截器,我们就会进入到其中的invoke方法,在这其中会做一些@GlobalTransactional注解的判断,如果有注解以后,会执行全局事务和全局锁,那么在执行全局事务的时候会调用handleGlobalTransaction全局事务处理器,这里主要是获取事务信息

Object handleGlobalTransaction(final MethodInvocation methodInvocation,final GlobalTransactional globalTrxAnno) throws Throwable {boolean succeed = true;try {return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {return methodInvocation.proceed();}// 获取事务名称,默认获取方法名public String name() {String name = globalTrxAnno.name();if (!StringUtils.isNullOrEmpty(name)) {return name;}return formatMethod(methodInvocation.getMethod());}/*** 解析GlobalTransactional注解属性,封装为对象* @return*/@Overridepublic TransactionInfo getTransactionInfo() {// reset the value of timeout// 获取超时时间,默认60秒int timeout = globalTrxAnno.timeoutMills();if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout = defaultGlobalTransactionTimeout;}// 构建事务信息对象TransactionInfo transactionInfo = new TransactionInfo();transactionInfo.setTimeOut(timeout);// 超时时间transactionInfo.setName(name()); // 事务名称transactionInfo.setPropagation(globalTrxAnno.propagation());// 事务传播transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());// 校验或占用全局锁重试间隔transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());// 校验或占用全局锁重试次数Set<RollbackRule> rollbackRules = new LinkedHashSet<>();// 其他构建信息for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : globalTrxAnno.rollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));}for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : globalTrxAnno.noRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {// 执行异常TransactionalExecutor.Code code = e.getCode();switch (code) {case RollbackDone:throw e.getOriginalException();case BeginFailure:succeed = false;failureHandler.onBeginFailure(e.getTransaction(), e.getCause());throw e.getCause();case CommitFailure:succeed = false;failureHandler.onCommitFailure(e.getTransaction(), e.getCause());throw e.getCause();case RollbackFailure:failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();case RollbackRetrying:failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();default:throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));}} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}
}

在这其中,我们要关注一个重点方法execute()

其实这个方法主要的作用就是,执行事务的流程,大概一下几点:

  1. 获取事务信息
  2. 开始执行全局事务
  3. 发生异常全局回滚,各个数据通过undo_log表进行事务补偿
  4. 全局事务提交
  5. 清除所有资源

这个位置是非常核心的一个位置,因为我们所有的业务进来以后都会走这个位置。

这其中的第三步和第四步就是在想TC(Seata-Server)发起全局事务的提交/回滚

public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfo// 获取事务信息TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.// 获取当前事务,主要获取XidGlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.// 根据配置的不同事务传播行为,执行不同的逻辑Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {case NOT_SUPPORTED:// If transaction is existing, suspend it.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();}// Execute without transaction and return.return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();tx = GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.// 当前没有事务,则创建一个新的事务if (tx == null) {tx = GlobalTransactionContext.createNew();}// set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,//    else do nothing. Of course, the hooks will still be triggered.// 开始执行全局事务beginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 执行当前业务逻辑:// 1. 在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据// 2. 执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log表中// 3. 远程调用其他应用,远程应用接收到xid,也会注册分支事务,写入branch_table及本地undo_log表// 4. 会在lock_table表中插入全局锁数据(一个分支一条)rs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 发生异常全局回滚,各个数据通过undo_log表进行事务补偿completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.// 全局提交事务commitTransaction(tx);return rs;} finally {//5. clear// 清除所有资源resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}
}

如何发起全局事务

这个位置我们就看当前这个代码中的 beginTransaction(txInfo, tx);方法

// 想TC发起请求,这里采用了模板模式
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {triggerBeforeBegin();// 对TC发起请求tx.begin(txInfo.getTimeOut(), txInfo.getName());triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);}
}	

那我们向下来看begin方法,那要注意,这里调用begin方法的是DefaultGlobalTransaction

@Override
public void begin(int timeout, String name) throws TransactionException {//判断调用者是否是TMif (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}assertXIDNull();String currentXid = RootContext.getXID();if (currentXid != null) {throw new IllegalStateException("Global transaction already exists," +" can't begin a new global transaction, currentXid = " + currentXid);}// 获取Xidxid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}
}

在向下来看begin方法,这时候使用的是(默认事务管理者)DefaultTransactionManager.begin,来真正的获取xid,其中就是传入事务的相关信息,最终TC端返回对应的全局事务Xid。

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);// 发送请求得到响应GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}//返回Xidreturn response.getXid();
}

这里采用的是Netty的通讯方式

private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {// 通过Netty发送请求return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException toe) {throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);}
}

图解地址:https://www.processon.com/view/link/6213d58f1e0853078013c58f

相关文章:

Seata源码学习(三)-2PC核心源码解读

Seata源码分析-2PC核心源码解读 2PC提交源码流程 上节课我们分析到了GlobalTransactionalInterceptor全局事务拦截器&#xff0c;一旦执行拦截器&#xff0c;我们就会进入到其中的invoke方法&#xff0c;在这其中会做一些GlobalTransactional注解的判断&#xff0c;如果有注解…...

IO流概述

&#x1f3e1;个人主页 &#xff1a; 守夜人st &#x1f680;系列专栏&#xff1a;Java …持续更新中敬请关注… &#x1f649;博主简介&#xff1a;软件工程专业&#xff0c;在校学生&#xff0c;写博客是为了总结回顾一些所学知识点 目录IO流概述IO 流的分类总结流的四大类字…...

【node.js】node.js的安装和配置

文章目录前言下载和安装Path环境变量测试推荐插件总结前言 Node.js是一个在服务器端可以解析和执行JavaScript代码的运行环境&#xff0c;也可以说是一个运行时平台&#xff0c;仍然使用JavaScript作为开发语言&#xff0c;但是提供了一些功能性的API。 下载和安装 Node.js的官…...

Python优化算法—遗传算法

Python优化算法—遗传算法一、前言二、安装三、遗传算法3.1 自定义函数3.2 遗传算法进行整数规划3.3 遗传算法用于旅行商问题3.4 使用遗传算法进行曲线拟合一、前言 优化算法&#xff0c;尤其是启发式的仿生智能算法在最近很火&#xff0c;它适用于解决管理学&#xff0c;运筹…...

数据埋点(Data buried point)的应用价值剖析

一、什么是数据埋点&#xff1f;数据埋点指在应用中特定的流程中收集一些信息&#xff0c;用来跟踪应用使用的状况&#xff0c;后续用来进一步优化产品或是提供运营的数据支撑。比如访问数&#xff08;Visits&#xff09;&#xff0c;访客数(Visitor&#xff09;&#xff0c;停…...

一文弄懂硬链接、软链接、复制的区别

复制 命令&#xff1a;cp file1 file2 作用&#xff1a;实现对file1的一个拷贝。 限制&#xff1a;可以跨分区&#xff0c;文件夹有效。 效果&#xff1a;修改file1&#xff0c;对file2无影响&#xff1b;修改file2&#xff0c;对file1无影响。删除file1&#xff0c;对file…...

界面组件Telerik ThemeBuilder R1 2023开创应用主题研发新方式!

Telerik DevCraft包含一个完整的产品栈来构建您下一个Web、移动和桌面应用程序。它使用HTML和每个.NET平台的UI库&#xff0c;加快开发速度。Telerik DevCraft提供最完整的工具箱&#xff0c;用于构建现代和面向未来的业务应用程序&#xff0c;目前提供UI for ASP.NET包含一个完…...

在FederatedScope 如何查看clientserver之间的传递的参数大小(通讯量)? 对源码的探索记录

在FederatedScope 如何查看client/server之间的传递的参数大小&#xff08;通讯量&#xff09;&#xff1f; 对源码的探索记录 背景需求 想给自己的论文补一个通讯开销对比实验&#xff1a;需要计算出client和server之间传递的信息(例如&#xff0c;模型权重、embedding)总共…...

2023爱分析 · 数据科学与机器学习平台厂商全景报告 | 爱分析报告

报告编委 黄勇 爱分析合伙人&首席分析师 孟晨静 爱分析分析师 目录 1. 研究范围定义 2. 厂商全景地图 3. 市场分析与厂商评估 4. 入选厂商列表 1. 研究范围定义 研究范围 经济新常态下&#xff0c;如何对海量数据进行分析挖掘以支撑敏捷决策、适应市场的快…...

20230215_数据库过程_高质量发展

高质量发展 —一、运营结果 SQL_STRING:‘delete shzc.np_rec_lnpdb a where exists (select * from tbcs.v_np_rec_lnpdbbcv t where a.telnumt.telnum and a.outcarriert.OUTCARRIER and a.incarriert.INCARRIER and a.owncarriert.OWNCARRIER and a.starttimet.STARTTIME …...

【百度 JavaScript API v3.0】LocalSearch 位置检索、Autocomplete 结果提示

地名检索移动到指定坐标 需求 在输入框中搜索&#xff0c;在下拉列表中浮动&#xff0c;右侧出现高亮的列表集。选中之后移动到指定坐标。 技术点 官网地址&#xff1a; JavaScript API - 快速入门 | 百度地图API SDK 开发文档&#xff1a;百度地图JSAPI 3.0类参考 实现 …...

运用Facebook投放,如何制定有效的竞价策略?

广告投放中&#xff0c;我们经常会遇到一个问题&#xff0c;就是不知道什么样的广告适合自己的业务。其实&#xff0c;最简单的方法就是根据我们业务本身进行定位并进行投放。当你了解了广告主所处行业及目标受众后&#xff0c;接下来会针对目标市场进行搜索和定位&#xff08;…...

大数据框架之Hadoop:HDFS(五)NameNode和SecondaryNameNode(面试开发重点)

5.1NN和2NN工作机制 5.1.1思考&#xff1a;NameNode中的元数据是存储在哪里的&#xff1f; 首先&#xff0c;我们做个假设&#xff0c;如果存储在NameNode节点的磁盘中&#xff0c;因为经常需要进行随机访问&#xff0c;还有响应客户请求&#xff0c;必然是效率过低。因此&am…...

计算机网络 - 1. 体系结构

目录概念、功能、组成、分类概念功能组成分类分层结构概念总结OSI 七层模型应用层表示层会话层传输层网络层数据链路层物理层TCP/IP 四层模型OSI 与 TCP/IP 相同点OSI 与 TCP/IP 不同点为什么 TCP/IP 去除了表示层和会话层五层参考模型概念、功能、组成、分类 概念 &#x1f…...

银行业上云进行时,OLAP 云服务如何解决传统数仓之痛?

本文节选自《中国金融科技发展概览&#xff1a;创新与应用前沿》&#xff0c;从某国有大行构建大数据云平台的实践出发&#xff0c;解读了 OLAP 云服务如何助力银行实现技术平台化、组件化和云服务化&#xff0c;降低技术应用门槛&#xff0c;赋能业务创新。此外&#xff0c;本…...

特定领域知识图谱融合方案:文本匹配算法之预训练Simbert、ERNIE-Gram单塔模型等诸多模型【三】

特定领域知识图谱融合方案:文本匹配算法之预训练模型SimBert、ERNIE-Gram 文本匹配任务在自然语言处理中是非常重要的基础任务之一,一般研究两段文本之间的关系。有很多应用场景;如信息检索、问答系统、智能对话、文本鉴别、智能推荐、文本数据去重、文本相似度计算、自然语…...

【2023最新教程】从0到1开发自动化测试框架(0基础也能看懂)

一、序言 随着项目版本的快速迭代、APP测试有以下几个特点&#xff1a; 首先&#xff0c;功能点多且细&#xff0c;测试工作量大&#xff0c;容易遗漏&#xff1b;其次&#xff0c;代码模块常改动&#xff0c;回归测试很频繁&#xff0c;测试重复低效&#xff1b;最后&#x…...

linux备份命令小记 —— 筑梦之路

Linux dump命令用于备份文件系统。 dump为备份工具程序&#xff0c;可将目录或整个文件系统备份至指定的设备&#xff0c;或备份成一个大文件。 dump命令只可以备份ext2/3/4格式的文件系统&#xff0c; centos7默认未安装dump命令&#xff0c;可以使用yum install -y dump安…...

vue项目(vue-cli)配置环境变量和打包时区分开发、测试、生产环境

1.打包时区分不同环境在自定义配置Vue-cli 的过程中&#xff0c;想分别通过.env.development .env.test .env.production 来代表开发、测试、生产环境。NODE_ENVdevelopment NODE_ENVtest NODE_ENVproduction本来想使用上面三种配置来区分三个环境&#xff0c;但是发现使用test…...

Python 命名规范

Python 命名规范 基本规范 类型公有内部备注Packagepackage_namenone全小写下划线式驼峰Modulemodule_name_module_name全小写下划线式驼峰ClassClassName_ClassName首字母大写式驼峰Methodmethod_nameprotected: _method_name private: __method_name全小写下划线式驼峰Exce…...

操作系统——2.操作系统的特征

这篇文章&#xff0c;我们来讲一讲操作系统的特征 目录 1.概述 2.并发 2.1并发概念 2.1.1操作系统的并发性 3.共享 3.1共享的概念 3.2共享的方式 4.并发和共享的关系 5.虚拟 5.1虚拟的概念 5.2虚拟小结 6.异步 6.1异步概念 7.小结 1.概述 上一篇文章&#xff0c;我们…...

【计算机网络期末复习】第六章 应用层

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 &#x1f4e3;专栏定位&#xff1a;为想复习学校计算机网络课程的同学提供重点大纲&#xff0c;帮助大家渡过期末考~ &#x1f4da;专栏地址&#xff1a;https://blog.csdn.net/Newin2020/arti…...

TypeScript基本教程

TS是JS的超集&#xff0c;所以JS基础的类型都包含在内 起步安装 npm install typescript -g运行tsc 文件名 基础类型 Boolean、Number、String、null、undefined 以及 ES6 的 Symbol 和 ES10 的 BigInt。 1 字符串类型 字符串是使用string定义的 let a: string 123 //普…...

使用Windows API实现本地音频采集

Windows API提供了Winmm&#xff08;Windows多媒体&#xff09;库&#xff0c;其中包括了音频设备相关的函数&#xff0c;可以用来实现音频设备的枚举和测试。 下面是一个简单的示例代码&#xff0c;演示了如何使用Winmm库中的waveInGetNumDevs()函数来枚举计算机上的音频输入…...

实用的费曼学习法 | 一些思考

文章目录 一、前言二、费曼学习法CSDN 叶庭云:https://yetingyun.blog.csdn.net/ 大数据与人工智能背景下,最重要的是:捕捉机会和快速学习的能力 一、前言 费曼学习法是美国著名的物理学家,理查德 ∙ \bullet ∙ 费曼总结出来的学习方法。 这个方法的核心是:当你学习了…...

Linux安装Docker配置docker-compose 编排工具【超详细】

一、介绍Docker Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中&#xff0c;然后发布到任何流行的 Linux或Windows操作系统的机器上&#xff0c;也可以实现虚拟化。容器是完全使用沙箱机制&#xff0c;相互之间不会有…...

iTerm2 + Oh My Zsh 打造舒适终端体验

最终效果图&#xff1a; 因为powerline以及homebrew均需要安装command line tool&#xff0c;网络条件优越的同学在执行本文下面内容之前&#xff0c;可以先安装XCode并打开运行一次&#xff08;会初始化安装components&#xff09;&#xff0c;省去以后在iterm2中的等待时间。…...

【scipy.sparse】diags()和dia_matrix()的区别

【scipy.sparse】diags()和dia_matrix()的区别 文章目录【scipy.sparse】diags()和dia_matrix()的区别1. 介绍2. 代码示例2.1 sp.diags()2.1.1 第一种用法&#xff08;dataoffsets&#xff09;2.1.2 广播&#xff08;需要指定shape&#xff09;2.1.3 只有一条对角线2.2 sp.dia_…...

java ssm自行车在线租赁系统idea

当前自行车在社会上广泛使用,但自行车的短距离仍旧不能完全满足广大用户的需求。自行车在线租赁系统可以为用户提供租赁用车等功能,拥有较好的用户体验.能实时在线租赁提供更加快捷方便的租车方式,解决了常见自行车在线租赁系统较为局限的自行车归还功能。 通过使用本系统&…...

GAN和CycleGAN

文章目录1. GAN 《Generative Adversarial Nets》1.1 相关概念1.2 公式理解1.3 图片理解1.4 熵、交叉熵、KL散度、JS散度1.5 其他相关&#xff08;正在补充&#xff01;&#xff09;2. Cycle GAN 《Unpaired Image-to-Image Translation using Cycle-Consistent Adversarial Ne…...

杭州做网站怎么收费多少/合肥搜索引擎推广

什么是 Penpot &#xff1f; Penpot 是第一个面向跨域团队的开源设计和原型制作平台。 Penpot 是基于 Web 的&#xff0c;不依赖于操作系统&#xff0c;使用开放式 Web 标准&#xff08;SVG&#xff09;&#xff0c;为所有人提供服务并由开源社区授权。 官网地址&#xff1a;ht…...

如何做类似优酷的视频网站/推广专员

一讲到工作流&#xff0c;很多人第一反应就是这个东西很深奥&#xff0c;有时候又觉得离我们较为遥远&#xff0c;确实完善的工作流设计很多方面&#xff0c;而正是由于需要兼顾很多方面&#xff0c;一般通用的工作流都难做到尽善尽美。微软也提供了几个版本的WF框架支持&#…...

做网站要用到数据库吗/软文街官方网站

2019独角兽企业重金招聘Python工程师标准>>> <!-- 关键字&#xff0c;搜所引擎 SEO --> <meta http-equiv"keywords" content"关键字1,关键字2,..."> <!-- 页面描述 --> <meta http-equiv"description" conte…...

北京市住房城乡建设厅网站/网站排名优化师

根据返回的错误代码判断3.1 错误代码&#xff1a;450 4.7.1 Client host rejected: cannot find your hostname错误原因&#xff1a;对方服务器未设置反向解析处理方式&#xff1a;请联络IDC进行处理。可参考中国互联网协会反垃圾中心提供的设置方式&#xff1a;http://www.an…...

重庆做网站价格/精准数据营销方案

今天小编跟大家讲解下有关html5使用canvas画空心圆与实心圆 &#xff0c;相信小伙伴们对这个话题应该有所关注吧&#xff0c;小编也收集到了有关html5使用canvas画空心圆与实心圆 的相关资料&#xff0c;希望小伙伴们看了有所帮助。这里给大家分享的是一个学习canvas的时候做的…...

建设摩托车公司官方网站/济南网站万词优化

计算机硬件对过程的支持 基本概念&#xff1a; 过程&#xff08;procedure&#xff09;或函数是程序员进行结构化编程的工具&#xff0c;两者均可以提高程序的可理解性和代码的重用性。过程允许程序员将精力集中在任务的一部分&#xff0c;由于参数能传递数值并返回结果&…...