当前位置: 首页 > news >正文

分布式事务 | 使用DTM 的Saga 模式

DTM 简介


前面章节提及的MassTransit、dotnetcore/CAP都提供了分布式事务的处理能力,但也仅局限于Saga和本地消息表模式的实现。那有没有一个独立的分布式事务解决方案,涵盖多种分布式事务处理模式,如Saga、TCC、XA模式等。有,目前业界主要有两种开源方案,其一是阿里开源的Seata,另一个就是DTM。其中Seata仅支持Java、Go和Python语言,因此不在.NET 的选择范围。DTM则通过提供简单易用的HTTP和gRPC接口,屏蔽了语言的无关性,因此支持任何开发语言接入,目前提供了Go、Python、NodeJs、Ruby、Java和C#等语言的SDK。

DTM,全称Distributed Transaction Manager,是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题。它提供了Saga、TCC、 XA和二阶段消息模式以满足不同应用场景的需求,同时其首创的子事务屏障技术可以有效解决幂等、悬挂和空补偿等异常问题。

DTM 事务处理过程及架构


那DTM是如何处理分布式事务的呢?以一个经典的跨行转账业务为例来看下事务处理过程。对于跨行转账业务而言,很显然是跨库跨服务的应用场景,不能简单通过本地事务解决,可以使用Saga模式,以下是基于DTM提供的Saga事务模式成功转账的的时序图:

从以上时序图可以看出,DTM整个全局事务分为如下几步:

  1. 用户定义好全局事务所有的事务分支(全局事务的组成部分称为事务分支),然后提交给DTM,DTM持久化全局事务信息后,立即返回

  1. DTM取出第一个事务分支,这里是TransOut,调用该服务并成功返回

  1. DTM取出第二个事务分支,这里是TransIn,调用该服务并成功返回

  1. DTM已完成所有的事务分支,将全局事务的状态修改为已完成

基于以上这个时序图的基础上,再来看下DTM的架构:

整个DTM架构中,一共有三个角色,分别承担了不同的职责:

  • RM-资源管理器:RM是一个应用服务,通常连接到独立的数据库,负责处理全局事务中的本地事务,执行相关数据的修改、提交、回滚、补偿等操作。例如在前面的这个Saga事务时序图中,步骤2、3中被调用的TransIn和TransOut方法所在的服务都是RM。

  • AP-应用程序:AP是一个应用服务,负责全局事务的编排,他会注册全局事务,注册子事务,调用RM接口。例如在前面的这个SAGA事务中,发起步骤1的是AP,它编排了一个包含TransOut、TransIn的全局事务,然后提交给TM

  • TM-事务管理器:TM就是DTM服务,负责全局事务的管理,作为一个独立的服务而存在。每个全局事务都注册到TM,每个事务分支也注册到TM。TM会协调所有的RM来执行不同的事务分支,并根据执行结果决定是否提交或回滚事务。例如在前面的Saga事务时序图中,TM在步骤2、3中调用了各个RM,在步骤4中,完成这个全局事务。

总体而言,AP-应用程序充当全局事务编排器的角色通过DTM提供的开箱即用的SDK进行全局事务和子事务的注册。TM-事务管理器接收到注册的全局事务和子事务后,负责调用RM-资源管理器来执行对应的事务分支,TM-事务管理器根据事务分支的执行结果决定是否提及或回滚事务。

快速上手


百闻不如一见,接下来就来实际上手体验下如何基于DTM来实际应用Saga进行分布式跨行转账事务的处理。

创建示例项目


接下来就来创建一个示例项目:

  1. 使用dotnet new webapi -n DtmDemo.Webapi创建示例项目。

  1. 添加Nuget包:DtmcliPomelo.EntityFrameworkCore.MySql

  1. 添加DTM配置项:

{"dtm": {"DtmUrl": "http://localhost:36789","DtmTimeout": 10000,"BranchTimeout": 10000,"DBType": "mysql","BarrierTableName": "dtm_barrier.barrier",}
}
  1. 定义银行账户BankAccount实体类:

namespaceDtmDemo.WebApi.Models
{publicclassBankAccount{publicint Id { get; set; }publicdecimal Balance { get; set; }}
}
  1. 定义DtmDemoWebApiContext数据库上下文:

using Microsoft.EntityFrameworkCore;namespaceDtmDemo.WebApi.Data
{publicclassDtmDemoWebApiContext : DbContext{publicDtmDemoWebApiContext (DbContextOptions<DtmDemoWebApiContext> options): base(options){}public DbSet<DtmDemo.WebApi.Models.BankAccount> BankAccount { get; set; } = default!;}
}
  1. 注册DbContext 和DTM服务:

using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using Dtmcli;var builder = WebApplication.CreateBuilder(args);
var connectionStr = builder.Configuration.GetConnectionString("DtmDemoWebApiContext");
// 注册DbContext
builder.Services.AddDbContext<DtmDemoWebApiContext>(options =>
{options.UseMySql(connectionStr, ServerVersion.AutoDetect(connectionStr));
});// 注册DTM
builder.Services.AddDtmcli(builder.Configuration, "dtm");
  1. 执行dotnet ef migrations add 'Initial' 创建迁移。

  1. 为便于初始化演示数据,定义BankAccountController如下,其中PostBankAccount接口添加了await _context.Database.MigrateAsync();用于自动应用迁移。

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;namespaceDtmDemo.WebApi.Controllers
{[Route("api/[controller]")][ApiController]publicclassBankAccountsController : ControllerBase{privatereadonly DtmDemoWebApiContext _context;publicBankAccountsController(DtmDemoWebApiContext context){_context = context;}[HttpGet]publicasync Task<ActionResult<IEnumerable<BankAccount>>> GetBankAccount(){returnawait _context.BankAccount.ToListAsync();}[HttpPost]publicasync Task<ActionResult<BankAccount>> PostBankAccount(BankAccount bankAccount){await _context.Database.MigrateAsync();_context.BankAccount.Add(bankAccount);await _context.SaveChangesAsync();return Ok(bankAccount);}
}

应用Saga模式


接下来定义SagaDemoController来使用DTM的Saga模式来模拟跨行转账分布式事务:

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;
using DtmCommon;namespaceDtmDemo.WebApi.Controllers
{[Route("api/[controller]")][ApiController]publicclassSagaDemoController : ControllerBase{privatereadonly DtmDemoWebApiContext _context;privatereadonly IConfiguration _configuration;privatereadonly IDtmClient _dtmClient;privatereadonly IDtmTransFactory _transFactory;privatereadonly IBranchBarrierFactory _barrierFactory;privatereadonly ILogger<BankAccountsController> _logger;publicSagaDemoController(DtmDemoWebApiContext context, IConfiguration configuration, IDtmClient dtmClient, IDtmTransFactory transFactory, ILogger<BankAccountsController> logger, IBranchBarrierFactory barrierFactory){this._context = context;this._configuration = configuration;this._dtmClient = dtmClient;this._transFactory = transFactory;this._logger = logger;this._barrierFactory = barrierFactory;}
}

对于跨行转账业务,使用DTM的Saga模式,首先要进行事务拆分,可以拆分为以下4个子事务,并分别实现:

转出子事务(TransferOut)


    [HttpPost("TransferOut")]publicasync Task<IActionResult> TransferOut([FromBody] TransferRequest request){var msg = $"用户{request.UserId}转出{request.Amount}元";_logger.LogInformation($"转出子事务-启动:{msg}");// 1. 创建子事务屏障var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);try{using (var conn = _context.Database.GetDbConnection()){// 2. 在子事务屏障内执行事务操作await branchBarrier.Call(conn, async (tx) =>{_logger.LogInformation($"转出子事务-执行:{msg}");await _context.Database.UseTransactionAsync(tx);var bankAccount = await _context.BankAccount.FindAsync(request.UserId);if (bankAccount == null || bankAccount.Balance < request.Amount)thrownew InvalidDataException("账户不存在或余额不足!");bankAccount.Balance -= request.Amount;await _context.SaveChangesAsync();});}}catch (InvalidDataException ex){_logger.LogInformation($"转出子事务-失败:{ex.Message}");// 3. 按照接口协议,返回409,以表示子事务失败returnnew StatusCodeResult(StatusCodes.Status409Conflict);}_logger.LogInformation($"转出子事务-成功:{msg}");return Ok();}

以上代码中有几点需要额外注意:

  1. 使用Saga模式,必须开启子事务屏障:_barrierFactory.CreateBranchBarrier(Request.Query),其中Request.Query中的参数由DTM 生成,类似:?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga,主要包含四个参数:

  1. gid:全局事务Id

  1. trans_type:事务类型,是saga、msg、xa或者是tcc。

  1. branch_id:子事务的Id

  1. op:当前操作,对于Saga事务模式,要么为action(正向操作),要么为compensate(补偿操作)。

  1. 必须在子事务屏障内执行事务操作:branchBarrier.Call(conn, async (tx) =>{}

  1. 对于Saga正向操作而言,业务上的失败与异常是需要做严格区分的,例如前面的余额不足,是业务上的失败,必须回滚。而对于网络抖动等其他外界原因导致的事务失败,属于业务异常,则需要重试。因此若因业务失败(这里是账户不存在或余额不足)而导致子事务失败,则必须通过抛异常的方式并返回**409**状态码以告知DTM 子事务失败。

  1. 以上通过抛出异常的方式中断子事务执行并在外围捕获特定异常返回409状态码。在外围捕获异常时切忌放大异常捕获,比如直接catch(Exception),如此会捕获由于网络等其他原因导致的异常,而导致DTM 不再自动处理该异常,比如业务异常时的自动重试。

转出补偿子事务(TransferOut_Compensate)


转出补偿,就是回滚转出操作,进行账户余额归还,实现如下:

    [HttpPost("TransferOut_Compensate")]publicasync Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request){var msg = $"用户{request.UserId}回滚转出{request.Amount}元";_logger.LogInformation($"转出补偿子事务-启动:{msg}");// 1. 创建子事务屏障var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);using (var conn = _context.Database.GetDbConnection()){// 在子事务屏障内执行事务操作await branchBarrier.Call(conn, async (tx) =>{_logger.LogInformation($"转出补偿子事务-执行:{msg}");await _context.Database.UseTransactionAsync(tx);var bankAccount = await _context.BankAccount.FindAsync(request.UserId);if (bankAccount == null)return; //对于补偿操作,可直接返回,中断后续操作bankAccount.Balance += request.Amount;await _context.SaveChangesAsync();});}_logger.LogInformation($"转出补偿子事务-成功!");// 2. 因补偿操作必须成功,所以必须返回200。return Ok();}

由于DTM设计为总是执行补偿,也就是说即使正向操作子事务失败时,DTM 仍旧会执行补偿逻辑。但子事务屏障会在执行时判断正向操作的执行状态,当子事务失败时,并不会执行补偿逻辑。

另外DTM的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功因此在补偿子事务中,即使补偿子事务中出现业务失败时,也必须返回**200**因此当出现bankAccount==null时可以直接 return。

转入子事务(TransferIn)


转入子事务和转出子事务的实现基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并通过抛异常的方式并最终返回409状态码来显式告知DTM 子事务执行失败。

    [HttpPost("TransferIn")]publicasync Task<IActionResult> TransferIn([FromBody] TransferRequest request){var msg = $"用户{request.UserId}转入{request.Amount}元";_logger.LogInformation($"转入子事务-启动:{msg}");var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);try{using (var conn = _context.Database.GetDbConnection()){await branchBarrier.Call(conn, async (tx) =>{_logger.LogInformation($"转入子事务-执行:{msg}");await _context.Database.UseTransactionAsync(tx);var bankAccount = await _context.BankAccount.FindAsync(request.UserId);if (bankAccount == null)thrownew InvalidDataException("账户不存在!");bankAccount.Balance += request.Amount;await _context.SaveChangesAsync();});}}catch (InvalidDataException ex){_logger.LogInformation($"转入子事务-失败:{ex.Message}");returnnew StatusCodeResult(StatusCodes.Status409Conflict);}_logger.LogInformation($"转入子事务-成功:{msg}");return Ok();}

转入补偿子事务(TransferIn_Compensate)


转入补偿子事务和转出补偿子事务的实现也基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并最终返回200状态码来告知DTM 补偿子事务执行成功。

    [HttpPost("TransferIn_Compensate")]publicasync Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request){var msg = "用户{request.UserId}回滚转入{request.Amount}元";_logger.LogInformation($"转入补偿子事务-启动:{msg}");var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);using (var conn = _context.Database.GetDbConnection()){await branchBarrier.Call(conn, async (tx) =>{_logger.LogInformation($"转入补偿子事务-执行:{msg}");await _context.Database.UseTransactionAsync(tx);var bankAccount = await _context.BankAccount.FindAsync(request.UserId);if (bankAccount == null) return;bankAccount.Balance -= request.Amount;await _context.SaveChangesAsync();});}_logger.LogInformation($"转入补偿子事务-成功!");return Ok();}

编排Saga事务


拆分完子事务,最后就可以进行Saga事务编排了,其代码如下所示:

    [HttpPost("Transfer")]publicasync Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,CancellationToken cancellationToken){try{_logger.LogInformation($"转账事务-启动:用户{fromUserId}转账{amount}元到用户{toUserId}");//1. 生成全局事务IDvar gid = await _dtmClient.GenGid(cancellationToken);var bizUrl = _configuration.GetValue<string>("TransferBaseURL");//2. 创建Sagavar saga = _transFactory.NewSaga(gid);//3. 添加子事务saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",new TransferRequest(fromUserId, amount)).Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",new TransferRequest(toUserId, amount)).EnableWaitResult(); // 4. 按需启用是否等待事务执行结果//5. 提交Saga事务await saga.Submit(cancellationToken);}catch (DtmException ex) // 6. 如果开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。{_logger.LogError($"转账事务-失败:用户{fromUserId}转账{amount}元到用户{toUserId}失败!");returnnew BadRequestObjectResult($"转账失败:{ex.Message}");}_logger.LogError($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");return Ok($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");}

主要步骤如下:

  1. 生成全局事务Id:var gid =await _dtmClient.GenGid(cancellationToken);

  1. 创建Saga全局事务:_transFactory.NewSaga(gid);

  1. 添加子事务:saga.Add(string action, string compensate, object postData);包含正向和反向子事务。

  1. 如果依赖事务执行结果,可通过EnableWaitResult()开启事务结果等待。

  1. 提交Saga全局事务:saga.Submit(cancellationToken);

  1. 若开启了事务结果等待,可以通过try...catch..来捕获DtmExcepiton异常来获取事务执行异常信息。

运行项目


既然DTM作为一个独立的服务存在,其负责通过HTTP或gRPC协议发起子事务的调用,因此首先需要启动一个DTM实例,又由于本项目依赖MySQL,因此我们采用Docker Compose的方式来启动项目。在Visual Studio中通过右键项目->Add->Docker Support->Linux 即可添加Dockerfile如下所示:

FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
WORKDIR /src
COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"]
RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj"
COPY . .
WORKDIR "/src/DtmDemo.WebApi"
RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/buildFROM build AS publish
RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publishFROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"]

在Visual Studio中通过右键项目->Add Container Orchestrator Support->Docker Compose即可添加docker-compose.yml,由于整个项目依赖mysql和DTM,修改docker-compose.yml如下所示,其中定义了三个服务:db,dtm和dtmdemo.webapi。

version:'3.4'services:db:image:'mysql:5.7'container_name:dtm-mysqlenvironment:MYSQL_ROOT_PASSWORD:123456# 指定MySQL初始密码volumes:-./docker/mysql/scripts:/docker-entrypoint-initdb.d# 挂载用于初始化数据库的脚本ports:-'3306:3306'dtm:depends_on: ["db"]image:'yedf/dtm:latest'container_name:dtm-svcenvironment:IS_DOCKER:'1'STORE_DRIVER:mysql# 指定使用MySQL持久化DTM事务数据STORE_HOST:db# 指定MySQL服务名,这里是dbSTORE_USER:rootSTORE_PASSWORD:'123456'STORE_PORT:3306STORE_DB:"dtm"# 指定DTM 数据库名ports:-'36789:36789'# DTM HTTP 端口-'36790:36790'# DTM gRPC 端口dtmdemo.webapi:depends_on: ["dtm", "db"]image:${DOCKER_REGISTRY-}dtmdemowebapienvironment:ASPNETCORE_ENVIRONMENT:docker# 设定启动环境为dockercontainer_name:dtm-webapi-demobuild:context:.dockerfile:DtmDemo.WebApi/Dockerfileports:-'31293:80'# 映射Demo:80端口到本地31293端口-'31294:443'	 # 映射Demo:443端口到本地31294端口

其中dtmdemo.webapi服务通过ASPNETCORE_ENVIRONMENT: docker指定启动环境为docker,因此需要在项目下添加appsettings.docker.json以配置应用参数:

{"ConnectionStrings": {"DtmDemoWebApiContext":"Server=db;port=3306;database=dtm_barrier;user id=root;password=123456;AllowLoadLocalInfile=true"},"TransferBaseURL":"http://dtmdemo.webapi/api/SagaDemo","dtm": {"DtmUrl":"http://dtm:36789","DtmTimeout":10000,"BranchTimeout":10000,"DBType":"mysql","BarrierTableName":"dtm_barrier.barrier"}
}

另外db服务中通过volumes: ["./docker/mysql/scripts:/docker-entrypoint-initdb.d"]来挂载初始化脚本,以创建DTM依赖的MySQL 存储数据库dtm和示例项目使用子事务屏障需要的barrier数据表。脚本如下:

CREATE DATABASE IF NOTEXISTS dtm
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
droptable IF EXISTS dtm.trans_global;
CREATETABLE if notEXISTS dtm.trans_global (`id` bigint(22) NOTNULL AUTO_INCREMENT,`gid` varchar(128) NOTNULL COMMENT 'global transaction id',`trans_type` varchar(45) notnull COMMENT 'transaction type: saga | xa | tcc | msg',`status` varchar(12) NOTNULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',`query_prepared` varchar(1024) NOTNULL COMMENT 'url to check for msg|workflow',`protocol` varchar(45) notnull comment 'protocol: http | grpc | json-rpc',`create_time` datetime DEFAULTNULL,`update_time` datetime DEFAULTNULL,`finish_time` datetime DEFAULTNULL,`rollback_time` datetime DEFAULTNULL,`options` varchar(1024) DEFAULT'options for transaction like: TimeoutToFail, RequestTimeout',`custom_data` varchar(1024) DEFAULT'' COMMENT 'custom data for transaction',`next_cron_interval` int(11) defaultnull comment 'next cron interval. for use of cron job',`next_cron_time` datetime defaultnull comment 'next time to process this trans. for use of cron job',`owner` varchar(128) notnulldefault'' comment 'who is locking this trans',`ext_data` TEXT comment 'result for this trans. currently used in workflow pattern',`result` varchar(1024) DEFAULT'' COMMENT 'rollback reason for transaction',`rollback_reason` varchar(1024) DEFAULT'' COMMENT 'rollback reason for transaction',PRIMARY KEY (`id`),UNIQUE KEY `gid` (`gid`),key `owner`(`owner`),key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
droptable IF EXISTS dtm.trans_branch_op;
CREATETABLE IF NOTEXISTS dtm.trans_branch_op (`id` bigint(22) NOTNULL AUTO_INCREMENT,`gid` varchar(128) NOTNULL COMMENT 'global transaction id',`url` varchar(1024) NOTNULL COMMENT 'the url of this op',`data` TEXT COMMENT 'request body, depreceated',`bin_data` BLOB COMMENT 'request body',`branch_id` VARCHAR(128) NOTNULL COMMENT 'transaction branch ID',`op` varchar(45) NOTNULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',`status` varchar(45) NOTNULL COMMENT 'transaction op status: prepared | succeed | failed',`finish_time` datetime DEFAULTNULL,`rollback_time` datetime DEFAULTNULL,`create_time` datetime DEFAULTNULL,`update_time` datetime DEFAULTNULL,PRIMARY KEY (`id`),UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
droptable IF EXISTS dtm.kv;
CREATETABLE IF NOTEXISTS dtm.kv (`id` bigint(22) NOTNULL AUTO_INCREMENT,`cat` varchar(45) NOTNULL COMMENT 'the category of this data',`k` varchar(128) NOTNULL,`v` TEXT,`version` bigint(22) default1 COMMENT 'version of the value',create_time datetime defaultNULL,update_time datetime DEFAULTNULL,PRIMARY KEY (`id`),UNIQUE key `uniq_k`(`cat`, `k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
create database if notexists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
droptable if exists dtm_barrier.barrier;
createtable if notexists dtm_barrier.barrier(id bigint(22) PRIMARY KEY AUTO_INCREMENT,trans_type varchar(45) default'',gid varchar(128) default'',branch_id varchar(128) default'',op varchar(45) default'',barrier_id varchar(45) default'',reason varchar(45) default'' comment 'the branch type who insert this record',create_time datetime DEFAULT now(),update_time datetime DEFAULT now(),key(create_time),key(update_time),UNIQUE key(gid, branch_id, op, barrier_id)
);

准备完毕,即可通过点击Visual Studio工具栏的Docker Compose的启动按钮,启动后可以在Containers窗口看到启动了dtm-mysql、dtm-svc和dtm-webapi-demo三个容器,并在浏览器中打开了 http://localhost:31293/swagger/index.html Swagger 网页。该种方式启动项目是支持断点调试项目,如下图所示:

通过BankAccouts控制器的POST接口,初始化用户1和用户2各100元。再通过SagaDemo控制器的/api/Transfer接口,进行Saga事务测试。

  1. 用户1转账10元到用户2

由于用户1和用户2已存在,且用户1余额足够, 因此该笔转账合法因此会成功,其执行路径为:转出(成功)->转入(成功)-> 事务完成,执行日志如下图所示:

  1. 用户3转账10元到用户1

由于用户3不存在,因此执行路径为:转出(失败)->转出补偿(成功)->事务完成。从下图的执行日志可以看出,转出子事务失败,还是会调用对应的转出补偿操作,但子事务屏障会过进行过滤,因此实际上并不会执行真正的转出补偿逻辑,其中红线框住的部分就是证明。

  1. 用户1转账10元到用户3

由于用户3不存在,因此执行路径为:转出(成功)->转入(失败)->转入补偿(成功)->转出补偿(成功)->事务完成。从下图的执行日志可以看出,转入子事务失败,还是会调用对应的转入补偿操作,但子事务屏障会过进行过滤,因此实际上并不会执行真正的转入补偿逻辑,其中红线框住的部分就是证明。

子事务屏障


在以上的示例中,重复提及子事务屏障,那子事务屏障具体是什么,这里有必要重点说明下。以上面用户1转账10元到用户3为例,整个事务流转过程中,即转出(成功)->转入(失败)->转入补偿(成功)->转出补偿(成功)->事务完成。

在提交事务之后,首先是全局事务的落库,主要由DTM 服务负责,主要包括两张表:trans_global和trans_branch_op,DTM 依此进行子事务分支的协调。其中trans_global会插入一条全局事务记录,用于记录全局事务的状态信息,如下图1所示。trans_branch_op表为trans_global的子表,记录四条子事务分支数据,如下图2所示:

具体的服务再接收到来自Dtm的子事务分支调用时,每次都会往子事务屏障表barrier中插入一条数据,如下图所示。业务服务就是依赖此表来完成子事务的控制。

而子事务屏障的核心就是子事务屏障表唯一键的设计,以gid、branch_id、op和barrier_id为唯一索引,利用唯一索引,“以改代查”来避免竞态条件。在跨行转账的Saga示例中,子事务分支的执行步骤如下所示:

  1. 开启本地事务

  1. 对于当前操作op(action|compensate),使用inster ignore into barrier(trans_type, gid, branch_id, op, barrier_id, reason)向子事务屏障表插入一条数据,有几种情况:

  1. 插入成功且影响条数大于0,则继续向下执行。

  1. 插入成功但影响条数等于0,说明触发唯一键约束,此时会进行空补偿、悬挂和重复请求判断,若是则直接返回,跳过后续子事务分支逻辑的执行。

  1. 第2步插入成功,则可以继续执行子事务分支逻辑,执行业务数据表操作,结果分两种请求

  1. 子事务成功,子事务屏障表操作和业务数据表操作由于共享同一个本地事务,提交本地事务,因此可实现强一致性,当前子事务分支完成。

  1. 子事务失败,回滚本地事务

每个子事务分支通过以上步骤,即可实现下图的效果:

小结


本文主要介绍了DTM的Saga模式的应用,基于DTM 首创的子事务屏障技术,使得开发者基于DTM 提供的SDK能够轻松开发出更可靠的分布式应用,彻底将开发人员从网络异常的处理中解放出来,再也不用担心空补偿、防悬挂、幂等等分布式问题。如果要进行分布式事务框架的选型,DTM 将是不二之选。

相关文章:

分布式事务 | 使用DTM 的Saga 模式

DTM 简介前面章节提及的MassTransit、dotnetcore/CAP都提供了分布式事务的处理能力&#xff0c;但也仅局限于Saga和本地消息表模式的实现。那有没有一个独立的分布式事务解决方案&#xff0c;涵盖多种分布式事务处理模式&#xff0c;如Saga、TCC、XA模式等。有&#xff0c;目前…...

错误代码0xc0000001要怎么解决?如何修复错误

出现错误代码0xc0000001这个要怎么解决&#xff1f;其实这个的蓝屏问题还是非常的简单的&#xff0c;有多种方法可以实现 解决方法一 1、首先使用电脑系统自带的修复功能&#xff0c;首先长按开机键强制电脑关机。 注&#xff1a;如果有重要的资料请先提前备份好&#xff0c;…...

为什么 HTTP PATCH 方法不是幂等的及其延伸

幂等性 首先来看什么是幂等性&#xff0c;根据 rfc2616(Hypertext Transfer Protocol – HTTP/1.1) 文档第 50 页底部对 Idempotent Methods 的定义&#xff1a; Methods can also have the property of “idempotence” in that (aside from error or expiration issues) the…...

13 Day:实现内核线程

前言&#xff1a;我们昨天完成了内核的内存池以及内存管理程序&#xff0c;今天我们要揭开操作系统多任务执行的神秘面纱&#xff0c;来了解并实现一个多任务的操作系统。 一&#xff0c;实现内核线程 在聊线程之间我们先聊聊处理器吧&#xff0c;众所周之现在我们的CPU动不动…...

GPU服务器安装显卡驱动、CUDA和cuDNN

GPU服务器安装cuda和cudnn1. 服务器驱动安装2. cuda安装3. cudNN安装4. 安装docker环境5. 安装nvidia-docker25.1 ubuntu系统安装5.2 centos系统安装6. 测试docker容调用GPU服务1. 服务器驱动安装 显卡驱动下载地址https://www.nvidia.cn/Download/index.aspx?langcn显卡驱动…...

结构体变量

C语言允许用户自己建立由不同类型数据组成的组合型的数据结构&#xff0c;它称为结构体&#xff08;structre&#xff09;。 在程序中建立一个结构体类型&#xff1a; 1.结构体 建立结构体 struct Student { int num; //学号为整型 char name[20]; //姓名为字符串 char se…...

Java 多态

文章目录1、多态的介绍2、多态的格式3、对象的强制类型转换4、instanceof 运算符5、案例&#xff1a;笔记本USB接口1、多态的介绍 多态&#xff08;Polymorphism&#xff09;按字面意思理解就是“多种形态”&#xff0c;即一个对象拥有多种形态。 即同一种方法可以根据发送对…...

九龙证券|一夜暴跌36%,美股走势分化,标普指数创近2月最差周度表现

当地时间2月10日&#xff0c;美股三大指数收盘涨跌纷歧。道指涨0.5%&#xff0c;标普500指数涨0.22%&#xff0c;纳指跌0.61%。 受国际油价明显上升影响&#xff0c;动力板块领涨&#xff0c;埃克森美孚、康菲石油涨超4%。大型科技股走低&#xff0c;特斯拉、英伟达跌约5%。热门…...

【数据库】 mysql用户授权详解

目录 MySQL用户授权 一&#xff0c;密码策略 1&#xff0c;查看临时密码 2&#xff0c;查看数据库当前密码策略&#xff1a; 二&#xff0c; 用户授权和撤销授权 1、创建用户 2&#xff0c;删除用户 3&#xff0c;授权和回收权限 MySQL用户授权 一&#xff0c;密码策略…...

【性能】性能测试理论篇_学习笔记_2023/2/11

性能测试的目的验证系统是否能满足用户提出的性能指标发现性能瓶颈&#xff0c;优化系统整体性能性能测试的分类注&#xff1a;这些测试类型其实是密切相关&#xff0c;甚至无法区别的&#xff0c;例如几乎所有的测试都有并发测试。在实际中不用纠结具体的概念。而是要明确测试…...

C语言(输入printf()函数)

printf()的细节操作很多&#xff0c;对于现阶段的朋友来说&#xff0c;主要还是以理解为主。因为很多的确很难用到。 目录 一.转换说明&#xff08;占位符&#xff09; 二.printf()转换说明修饰符 1.数字 2.%数字1.数字2 3.整型转换字符补充 4.标记 -符号 符号 空格符…...

Zabbix 构建监控告警平台(四)

Zabbix ActionZabbix Macros1.Zabbix Action 1.1动作Action简介 当某个触发器状态发生改变(如Problem、OK)&#xff0c;可以采取相应的动作&#xff0c;如&#xff1a; 执行远程命令 邮件&#xff0c;短信&#xff0c;微信告警,电话 1.2告警实验简介 1. 创建告警media type&…...

2004-2019年285个地级市实际GDP与名义GDP

2004-2019年285个地级市实际GDP和名义GDP 1、时间&#xff1a;2004-2019年 2、范围&#xff1a;285个地级市 3、说明&#xff1a;GDP平减指数采用地级市所在省份当年平减指数 4、代码&#xff1a; "gen rgdp gdp if year 2003 gen rgdp gdp if year 2003" re…...

Node.js笔记-Express(基于Node.js的web开发框架)

目录 Express概述 Express安装 基本使用 创建服务器 编写请求接口 接收请求参数 获取路径参数(/login/2) 静态资源托管-express.static&#xff08;内置中间件&#xff09; 什么是静态资源托管&#xff1f; express.static() 应用举例 托管多个静态资源 挂载路径前缀…...

力扣sql简单篇练习(十五)

力扣sql简单篇练习(十五) 1 直线上的最近距离 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出 1.2 示例sql语句 SELECT min(abs(p1.x-p2.x)) shortest FROM point p1 INNER JOIN point p2 ON p1.x <>p2.x1.3 运行截图 2 只出现一次的最大数字 2.1 题目内容 2…...

浅谈动态代理

什么是动态代理&#xff1f;以下为个人理解:动态代理就是在程序运行的期间&#xff0c;动态地针对对象的方法进行增强操作。并且这个动作的执行者已经不是"this"对象了&#xff0c;而是我们创建的代理对象&#xff0c;这个代理对象就是类似中间人的角色&#xff0c;帮…...

Idea超好用的管理工具ToolBox(附带idea工具)

文章目录为什么要用ToolBox总结idea管理安装、更新、卸载寻找ide配置、根路径idea使用准备工作配置为什么要用ToolBox 快速轻松地更新,轻松管理您的 JetBrains 工具 安装自动更新同时更新插件和 IDE回滚和降级通过下载补丁或一组补丁而不是整个包&#xff0c;节省维护 IDE 的…...

Spring 中 ApplicationContext 和 BeanFactory 的区别

文章目录类图包目录不同国际化强大的事件机制&#xff08;Event&#xff09;底层资源的访问延迟加载常用容器类图 包目录不同 spring-beans.jar 中 org.springframework.beans.factory.BeanFactoryspring-context.jar 中 org.springframework.context.ApplicationContext 国际…...

情人节有哪些数码好物值得送礼?情人节实用性强的数码好物推荐

转瞬间&#xff0c;情人节快到了&#xff0c;大家还在为送什么礼物而烦恼&#xff1f;在这个以科技为主的时代&#xff0c;人们正在享受着科技带来的便利&#xff0c;其中&#xff0c;数码产品也成为了日常生活中必不可少的存在。接下来&#xff0c;我来给大家推荐几款比较实用…...

java中flatMap用法

java中map是把集合每个元素重新映射&#xff0c;元素个数不变&#xff0c;但是元素值发生了变化。而flatMap从字面上来说是压平这个映射&#xff0c;实际作用就是将每个元素进行一个一对多的拆分&#xff0c;细分成更小的单元&#xff0c;返回一个新的Stream流&#xff0c;新的…...

【MySQL Shell】8.9.2 InnoDB ClusterSet 集群中的不一致事务集(GTID集)

AdminAPI 的 clusterSet.status() 命令警告您&#xff0c;如果 InnoDB 集群的 GTID 集与 InnoDB ClusterSet 中主集群上的 GTID 集不一致。与 InnoDB ClusterSet 中的其他集群相比&#xff0c;处于此状态的集群具有额外的事务&#xff0c;并且具有全局状态 OK_NOT_CONSISTENT 。…...

logstash毫秒时间戳转日期以及使用业务日志时间戳替换原始@timestamp

文章目录问题解决方式参考问题 在使用Kibana观察日志排查问题时发现存在很多组的timestamp 数据一样&#xff0c;如下所示 详细观察内部数据发现其中日志数据有一个timestamp字段保存的是业务日志的毫秒级时间戳&#xff0c;经过和timestamp数据对比发现二者的时间不匹配。经…...

【C语言】qsort——回调函数

目录 1.回调函数 2.qsort函数 //整形数组排序 //结构体排序 3.模拟实现qsort //整型数组排序 //结构体排序 1.回调函数 回调函数就是一个通过函数指针调用的函数。如果你把函数的指针&#xff08;地址&#xff09;作为参数传递给另一个函数&#xff0c;当这个指针被用来…...

8年软件测试工程师经验感悟

不知不觉在软件测试行业&#xff0c;野蛮生长了8年之久。这一路上拥有了非常多的感受。有迷茫&#xff0c;有踩过坑&#xff0c;有付出有收获&#xff0c; 有坚持&#xff01; 我一直都在软件测试行业奋战&#xff0c; 毕业时一起入职的好友已经公司内部转岗&#xff0c;去选择…...

腾讯云安全组配置参考版

官方文档参考: 云服务器 安全组应用案例-操作指南-文档中心-腾讯云 新建安全组时&#xff0c;您可以选择腾讯云为您提供的两种安全组模板&#xff1a; 放通全部端口模板&#xff1a;将会放通所有出入站流量。放通常用端口模板&#xff1a;将会放通 TCP 22端口&#xff08;Lin…...

代码覆盖率工具OpenCppCoverage在Windows上的使用

OpenCppCoverage是用在Windows C上的开源的代码覆盖率工具&#xff0c;源码地址为https://github.com/OpenCppCoverage/OpenCppCoverage &#xff0c;最新发布版本为0.9.9.0&#xff0c;License为GPL-3.0。 从https://github.com/OpenCppCoverage/OpenCppCoverage/releases 下载…...

代码随想录算法训练营第24天25天|● 77. 组合● 216.组合总和III ● 17.电话号码的字母组合

77组合 看完题后的思路 void f&#xff08;数组&#xff0c;startIndex&#xff09;递归终止 if&#xff08;startIndex数组长度||path.sizek&#xff09;{ if(path.sizek){ 加入} }递归 for&#xff08;&#xff1b;startIndex<num.size&#xff1b;startIndex&#xff0…...

Python_pytorch

python_pytorch 小土堆pytotch学习视频链接 from的是一个个的包&#xff08;package) import 的是一个个的py文件(file.py) 所使用的一般是文件中的类(.class) 第一步实例化所使用的类,然后调用类中的方法&#xff08;def) Dataset 数据集处理 import os from PIL impo…...

【Java|golang】2335. 装满杯子需要的最短总时长

现有一台饮水机&#xff0c;可以制备冷水、温水和热水。每秒钟&#xff0c;可以装满 2 杯 不同 类型的水或者 1 杯任意类型的水。 给你一个下标从 0 开始、长度为 3 的整数数组 amount &#xff0c;其中 amount[0]、amount[1] 和 amount[2] 分别表示需要装满冷水、温水和热水的…...

shell编程之sed

文章目录八、shell编程之sed8.1 工作原理8.2 sed基本语法8.3 模式空间中的编辑操作8.3.1 地址定界8.3.2 常用编辑命令8.4 sed扩展八、shell编程之sed 8.1 工作原理 sed是一种流编辑器&#xff0c;它是文本处理中非常有用的工具&#xff0c;能够完美的配合正则表达式使用&…...

建设区块链网站/今日热点新闻10条

Python学习教程&#xff08;Python学习视频_Python学些路线&#xff09;&#xff1a;字符串和常用数据结构 使用字符串 第二次世界大战促使了现代电子计算机的诞生&#xff0c;当初的想法很简单&#xff0c;就是用计算机来计算导弹的弹道&#xff0c;因此在计算机刚刚诞生的那…...

网站建设公司及网络安全法/网站seo优化心得

平时经常使用AsyncTask&#xff0c;深入学习一下&#xff0c;看看它究竟是如何工作的&#xff0c;先看代码。 /*** Creates a new asynchronous task. This constructor must be invoked on the UI thread.** hide*/public AsyncTask(Nullable Looper callbackLooper) {mHandle…...

教学网站开发应用指导方案/seo的主要工作是什么

文章目录前言1 RFM模型介绍2 代码实践(1) 导入库(2) 读取数据(3) 数据审查(4) 数据预处理,去除缺失值和异常值(5) 确定RFM划分区间&#xff08;6&#xff09;计算RFM因子权重&#xff08;7&#xff09;RFM计算过程(8) 保存结果到excel总结**小小结**&#xff1a;3 案例数据结论…...

女人能做网站开发吗/手机网站关键词快速排名

本文讲的是使用 Sketch 和 Pixate 构建 Material Design 原型 - 第二部分&#xff0c;在教程的 第一部分 我们制作了一个简单的登录界面并导出了所有资源。 在第二部分&#xff0c;我们打算继续在 Pixate 里创建一个原型。对于这一部分&#xff0c;你需要&#xff1a; Android …...

怎么做中英文的网站/直链平台

#目录 文章目录1. Redis安装约定2. 下载Redis安装包3. 解压并进入安装包中查看重要的文件及目录4. 正式安装redis5. 安装完成后查看安装目录6. 配置环境变量7. 拷贝安装包目录下的配置文件到安装目录的conf目录下8. 启动redis服务9. 再次启动redis服务10. 检查redis服务是否启动…...

网站信息安全建设方案/网络营销平台的主要功能

计算机视觉的研究领域 图像分类 语义分割 分类和定位 目标检测 实例分割 人脸识别 生成模型 风格迁移 物体跟踪 图像问答 转载于:https://www.cnblogs.com/kexinxin/p/9858556.html...