当前位置: 首页 > news >正文

Seata源码——TCC模式解析02

初始化

在SpringBoot启动的时候通过自动注入机制将GlobalTransactionScanner注入进ioc而GlobalTransactionScanner继承AbstractAutoProxyCreatorAbstract 在postProcessAfterInitialization阶段由子类创建代理TccActionInterceptor

GlobalTransactionScanner

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// ...// 注册RM、判断是否需要代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // tcc_fence_log清理任务TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);// 代理逻辑TccActionInterceptorinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); }// ...
}

TCC下的Bean类型

TCC模式下有三种特殊的SpringBean。
1.LocalTCC注释接口的Bean:如案例中的LocalTccAction;
2.RPC服务提供方ServiceBean:如Dubbo中被@DubboService注释的服务实现类,如案例中的StorageTccActionImpl;
3.RPC服务消费方ReferenceBean:如Dubbo中被@DubboReference注入的Bean,如案例中的StorageTccAction;
在这里插入图片描述

判断是否需要代理

TCCBeanParserUtils

public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {// dubbo:service 和 LocalTCC 注册为 RMboolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);if (isRemotingBean) {if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {// LocalTCC 需要被代理 TccActionInterceptorreturn isTccProxyTargetBean(remotingDesc); // 1} else {// dubbo:service(ServiceBean) 不需要被代理return false; // 2}} else {if (remotingDesc == null) {if (isRemotingFactoryBean(bean, beanName, applicationContext)) {// dubbo:reference(Dubbo ReferenceBean) 需要被代理 TccActionInterceptorremotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);return isTccProxyTargetBean(remotingDesc); // 3} else {return false;}} else {return isTccProxyTargetBean(remotingDesc);}}
}

isTccProxyTargetBean判断LocalTCC和ReferenceBean具体是否会被代理,只有接口里有TwoPhaseBusinessAction注解方法的类,才会返回true,被TccActionInterceptor拦截。

public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {if (remotingDesc == null) {return false;}boolean isTccClazz = false;Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();Method[] methods = tccInterfaceClazz.getMethods();TwoPhaseBusinessAction twoPhaseBusinessAction;for (Method method : methods) {twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (twoPhaseBusinessAction != null) {isTccClazz = true;break;}}if (!isTccClazz) {return false;}short protocols = remotingDesc.getProtocol();if (Protocols.IN_JVM == protocols) {return true; // local tcc}return remotingDesc.isReference(); // dubbo:reference
}

注册为RM

识别所有LocalTCC和ServiceBean中被TwoPhaseBusinessAction注解标注的方法,每个TwoPhaseBusinessAction注解的方法都作为一个TCCResource注册到TC。

TCCBeanParserUtils

    protected static boolean parserRemotingServiceInfo(Object bean, String beanName) {RemotingParser remotingParser = DefaultRemotingParser.get().isRemoting(bean, beanName);if (remotingParser != null) {return DefaultRemotingParser.get().parserRemotingServiceInfo(bean, beanName, remotingParser) != null;}return false;}

DefaultRemotingParser

public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);if (remotingBeanDesc == null) {return null;}remotingServiceMap.put(beanName, remotingBeanDesc);Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();Method[] methods = interfaceClass.getMethods();if (remotingParser.isService(bean, beanName)) {// localTcc or ServiceBeantry {Object targetBean = remotingBeanDesc.getTargetBean();for (Method m : methods) {TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);// 所有TwoPhaseBusinessAction注解标注的方法注册为一个Resourceif (twoPhaseBusinessAction != null) {TCCResource tccResource = new TCCResource();tccResource.setActionName(twoPhaseBusinessAction.name());tccResource.setTargetBean(targetBean);tccResource.setPrepareMethod(m);tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(),twoPhaseBusinessAction.commitArgsClasses()));tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(),twoPhaseBusinessAction.rollbackArgsClasses()));tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses());tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses());tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),twoPhaseBusinessAction.commitArgsClasses()));tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),twoPhaseBusinessAction.rollbackArgsClasses()));//注册到TCDefaultResourceManager.get().registerResource(tccResource);}}} catch (Throwable t) {throw new FrameworkException(t, "parser remoting service error");}}if (remotingParser.isReference(bean, beanName)) {remotingBeanDesc.setReference(true);}return remotingBeanDesc;
}

一阶段(Try)

TccActionInterceptor会拦截所有标注了TwoPhaseBusinessAction注解的方法执行invoke方法执行一阶段处理

一阶段其实就做了三件事
1.创建BusinessContext
2.将BusinessContext添加到上下文中让后续的Commit和Rollback能拿到数据
3.创建分支事务

// TccActionInterceptor
private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler();
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {return invocation.proceed();}Method method = getActionInterfaceMethod(invocation);TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (businessAction != null) {String xid = RootContext.getXID();BranchType previousBranchType = RootContext.getBranchType();if (BranchType.TCC != previousBranchType) {RootContext.bindBranchType(BranchType.TCC);}try {return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction,invocation::proceed);} finally {if (BranchType.TCC != previousBranchType) {RootContext.unbindBranchType();}MDC.remove(RootContext.MDC_KEY_BRANCH_ID);}}return invocation.proceed();
}

ActionInterceptorHandler

    public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,Callback<Object> targetCallback) throws Throwable {Map<String, Object> ret = new HashMap<>(4);//TCC nameString actionName = businessAction.name();//创建BusinessActionContext BusinessActionContext actionContext = new BusinessActionContext();//设置全局事务idactionContext.setXid(xid);//设置事务唯一名称 这里是从@TwoPhaseBusinessAction注解里面的name拿过来的actionContext.setActionName(actionName);//注册分支事务String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);//设置分支事务idactionContext.setBranchId(branchId);//MDC put branchIdMDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId);//设置BusinessActionContext属性信息Class<?>[] types = method.getParameterTypes();int argIndex = 0;for (Class<?> cls : types) {if (cls.getName().equals(BusinessActionContext.class.getName())) {arguments[argIndex] = actionContext;break;}argIndex++;}//the final parameters of the try methodret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);//执行业务方法,即try方法ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());return ret;}

BusinessActionContext 信息

public class BusinessActionContext implements Serializable {private static final long serialVersionUID = 6539226288677737991L;// 全局事务idprivate String xid;// 分支事务idprivate String branchId;// @TwoPhaseBusinessAction.nameprivate String actionName;// actionContextprivate Map<String, Object> actionContext;} 

actionContext存储了包括:try方法名(sys::prepare)、commit方法名(sys::commit)、rollback方法名(sys::rollback)、actionName(@TwoPhaseBusinessAction.name)、是否开启tccFence(@TwoPhaseBusinessAction.useTCCFence)、参数名称和参数值。

注册分支事务

    protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,BusinessActionContext actionContext) {String actionName = actionContext.getActionName();String xid = actionContext.getXid();//获取actionContext信息Map<String, Object> context = fetchActionRequestContext(method, arguments);context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());//初始化 BusinessContextinitBusinessContext(context, method, businessAction);//初始化上下文initFrameworkContext(context);//设置上下文信息actionContext.setActionContext(context);//init applicationDataMap<String, Object> applicationContext = new HashMap<>(4);applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);String applicationContextStr = JSON.toJSONString(applicationContext);try {//注册分支事务Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null);return String.valueOf(branchId);} catch (Throwable t) {String msg = String.format("TCC branch Register error, xid: %s", xid);LOGGER.error(msg, t);throw new FrameworkException(t, msg);}}

初始化BusinessContext

将TwoPhaseBusinessAction 注解的参数放入上下文中

    protected void initBusinessContext(Map<String, Object> context, Method method,TwoPhaseBusinessAction businessAction) {if (method != null) {//the phase one method namecontext.put(Constants.PREPARE_METHOD, method.getName());}if (businessAction != null) {//the phase two method namecontext.put(Constants.COMMIT_METHOD, businessAction.commitMethod());context.put(Constants.ROLLBACK_METHOD, businessAction.rollbackMethod());context.put(Constants.ACTION_NAME, businessAction.name());}}

初始化上下文

将本地IP放入上下文中

    protected void initFrameworkContext(Map<String, Object> context) {try {context.put(Constants.HOST_NAME, NetUtil.getLocalIp());} catch (Throwable t) {LOGGER.warn("getLocalIP error", t);}}

注册分支事务

RM进行分支事务的注册

RM进行分支事务的注册
AbstractResourceManager

    @Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));}return response.getBranchId();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);}}
TC处理分支事务注册请求

AbstractCore

public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {// Step1 根据xid查询global_table得到GlobalSessionGlobalSession globalSession = assertGlobalSessionNotNull(xid, false);// 对于存储模式=file的情况,由于GlobalSession在内存中,所以需要获取锁后再执行// 对于存储模式=db/redis的情况,不需要获取锁return SessionHolder.lockAndExecute(globalSession, () -> {// 状态校验 必须为beginglobalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));// Step2 获取全局锁(只有AT模式需要)branchSessionLock(globalSession, branchSession);try {// Step3 保存分支事务globalSession.addBranch(branchSession);} catch (RuntimeException ex) {branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}return branchSession.getBranchId();});
}

二阶段Commit

TM发起全局事务提交

当Try处理完之后 TM发起GlobalCommitRequest给TC,TC负责执行每个分支事务提交 这里在AT模式里面讲过 不知道的回看

TC处理二阶段提交

public GlobalStatus commit(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// 如果分支事务存在AT模式,先释放全局锁,delete from lock_table where xid = ?globalSession.closeAndClean();// 如果分支事务都是AT模式,则可以执行异步提交if (globalSession.canBeCommittedAsync()) {// 执行异步提交,更新全局事务状态为AsyncCommitting,update global_table set status = AsyncCommitting where xid = ?globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {// TCCglobalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) { // 同步提交(TCC)boolean success = doGlobalCommit(globalSession, false);if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else { // 异步提交(AT)return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}
}

1.执行全局事务提交核心逻辑,如果二阶段提交失败,会重试至成功为止。
2.因为这个方法也是AT模式用的 所以假如都是AT模式会被异步调用最后是释放锁 删除分支事务 删除undo_log 删除全局事务 自此提交结束 这个在前面讲过
3.假如是AT和TCC混合使用AT模式的分支事务会在异步任务中再次执行doGlobalCommit异步提交,TCC模式的分支事务还是会在第一次调用doGlobalCommit时同步提交,如果中间存在分支事务提交失败,会异步重试直至成功。

DefaultCore

public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// AT模式和TCC模式共存的情况下,AT模式跳过同步提交,只对TCC模式分支事务同步提交if (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}try {// Step1 发送BranchCommitRequest给RM,AT模式RM会删除undo_log,TCC模式RM执行二阶段提交BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Committed:// Step2 删除branch_table中的分支事务记录SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable: // 不可重试(XA中有实现)return false;default:if (!retrying) {// 更新全局事务为二阶段提交重试状态,异步重试至成功位置globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {return CONTINUE;} else {return false;}}} catch (Exception ex) {if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}// 某个分支事务处理失败,继续处理后续分支事务return CONTINUE;});// 如果是同步提交,某个分支事务处理失败,直接返回falseif (result != null) {return result;}if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {return false;}if (!retrying) {globalSession.setStatus(GlobalStatus.Committed);}}if (success && globalSession.getBranchSessions().isEmpty()) {// Step3 删除全局事务 delete from global_table where xid = ?SessionHelper.endCommitted(globalSession, retrying);}return success;
}

向RM发送分支事务提交请求

AbstractCore

protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession,BranchSession branchSession) throws IOException, TimeoutException {BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(branchSession.getResourceId(), branchSession.getClientId(), request);return response.getBranchStatus();
}// AbstractNettyRemotingServer
public Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException {// 定位客户端ChannelChannel channel = ChannelManager.getChannel(resourceId, clientId);if (channel == null) {throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);}RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());

获取客户端Channel

对于LocalTCC或者AT模式,分支事务注册与提交是同一个服务实例,通过resourceId+applicationId+ip+port一般就能定位到二阶段通讯的服务实例,但是可能对应服务宕机或者宕机后重连,这边会降级去找同一个ip不同port的,或者同一个applicationId的不同ip:port。
对于TCC模式下二阶段要找ServiceBean服务提供方的情况,直接进入Step2-fallback,找同一个resourceId下的其他applicationId注册的RM,这里就能找到storage-service进行二阶段提交,所以resourceId(actionName)最好全局唯一。

ChannelManager

public static Channel getChannel(String resourceId, String clientId) {Channel resultChannel = null;String[] clientIdInfo = readClientId(clientId);String targetApplicationId = clientIdInfo[0];String targetIP = clientIdInfo[1];int targetPort = Integer.parseInt(clientIdInfo[2]);ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);// Step1 根据resourceId找对应applicationId-ip-port对应channelif (targetApplicationId == null || applicationIdMap == null ||  applicationIdMap.isEmpty()) {return null;}ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);// Step2 根据BranchSession注册的applicationId应用if (ipMap != null && !ipMap.isEmpty()) {// Step3 根据BranchSession注册的ipConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {// Step4 根据BranchSession注册的portRpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);if (exactRpcContext != null) {Channel channel = exactRpcContext.getChannel();if (channel.isActive()) {resultChannel = channel;}}// Step4-fallback 可能原始channel关闭了,遍历BranchSession注册的ip对应的其他port(resourceId+applicationId+ip)if (resultChannel == null) {for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP.entrySet()) {Channel channel = portMapOnTargetIPEntry.getValue().getChannel();if (channel.isActive()) {resultChannel = channel;break;} }}}// Step3-fallback BranchSession注册的ip没有对应Channel,从resourceId+applicationId找对应channelif (resultChannel == null) {for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap.entrySet()) {if (ipMapEntry.getKey().equals(targetIP)) { continue; }ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {continue;}for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {Channel channel = portMapOnOtherIPEntry.getValue().getChannel();if (channel.isActive()) {resultChannel = channel;break;} }if (resultChannel != null) { break; }}}}// Step2-fallback BranchSession注册的applicationId没有对应channel,从resourceId中找一个Channelif (resultChannel == null) {resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);}return resultChannel;}

分支事务提交请求

AbstractNettyRemotingServer

 ......return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());

RM处理分支事务提交

TCCResourceManager

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// Step1 从本地缓存tccResourceMap中定位到资源对应本地commit方法TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);Object targetTCCBean = tccResource.getTargetBean();Method commitMethod = tccResource.getCommitMethod();try {// Step2 反序列化BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);// Step3 解析commit方法入参列表Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);Object ret;boolean result;// Step4 执行commit方法 也就相当于执行到了业务指定的commit方法if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {// Step4-1 开启useTCCFencetry {result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {throw e.getCause();}} else {// Step4-2 未开启useTCCFenceret = commitMethod.invoke(targetTCCBean, args);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}} else {result = true;}}//如果处理正常返回二阶段已提交 如果异常返回分支事务二阶段提交失败重试return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;} catch (Throwable t) {return BranchStatus.PhaseTwo_CommitFailed_Retryable;}
}

二阶段回滚

TM发起二阶段回滚请求

这个和AT那里的差不多 当TCC里面的try业务代码异常会触发二阶段的回滚

TC处理二阶段回滚请求

DefaultCore

public GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close();if (globalSession.getStatus() == GlobalStatus.Begin) {// 将全局锁lock_table状态更新为Rollbacking // 将全局事务global_table状态更新为RollbackingglobalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}// 执行全局回滚boolean rollbackSuccess = doGlobalRollback(globalSession, false);return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}

DefaultCore

public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;//遍历分支事务Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// Step1 发送BranchRollbackRequestBranchStatus branchStatus = branchRollback(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Rollbacked:// Step2-1 释放全局锁,删除分支事务SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable: // 回滚失败且无法重试成功SessionHelper.endRollbackFailed(globalSession, retrying);return false;default:// Step2-2 如果RM回滚失败 全局事务状态变为RollbackRetrying 等待重试if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {if (!retrying) {// 如果Step1或Step2步骤异常 全局事务状态变为RollbackRetrying 等待重试globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// 如果存在一个分支事务回滚失败,则返回falseif (result != null) {return result;}// Step3// 对于file模式,直接删除全局事务// 对于db/redis模式,异步再次执行doGlobalRollback,这里不做任何处理//  防止由于各种网络波动造成分支事务注册成功lock_table和branch_table中始终有残留数据//  导致全局锁一直被占用,无法释放if (success) {SessionHelper.endRollbacked(globalSession, retrying);}return success;
}

SessionHelper

public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {// 如果是重试 或 file模式if (retryGlobal || !DELAY_HANDLE_SESSION) {long beginTime = System.currentTimeMillis();GlobalStatus currentStatus = globalSession.getStatus();boolean retryBranch =currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;if (isTimeoutGlobalStatus(currentStatus)) {globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);} else {globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);}// 删除全局事务global_tableglobalSession.end();}
}

RM处理分支事务回滚

TCCResourceManager

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// Step1 从本地缓存tccResourceMap中定位到资源对应本地rollback方法TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);Object targetTCCBean = tccResource.getTargetBean();Method rollbackMethod = tccResource.getRollbackMethod();try {// Step2 反序列化BusinessActionContext//BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);// Step3 解析rollback方法入参列表Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);Object ret;boolean result;// Step4 执行rollback方法if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {try {result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,args, tccResource.getActionName());} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {throw e.getCause();}} else {ret = rollbackMethod.invoke(targetTCCBean, args);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}} else {result = true;}}return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;} catch (Throwable t) {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}
}

相关文章:

Seata源码——TCC模式解析02

初始化 在SpringBoot启动的时候通过自动注入机制将GlobalTransactionScanner注入进ioc而GlobalTransactionScanner继承AbstractAutoProxyCreatorAbstract 在postProcessAfterInitialization阶段由子类创建代理TccActionInterceptor GlobalTransactionScanner protected Obje…...

缓存-Redis

Springboot使用Redis 引入pom依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency>在application.yml、application-dev.yml中配置Redis的访…...

PADS Layout安全间距检查报错

问题&#xff1a; 在Pads Layout完成layout后&#xff0c;进行工具-验证设计安全间距检查时&#xff0c;差分对BAK_FIXCLK_100M_P / BAK_FIXCLK_100M_N的安全间距检查报错&#xff0c;最小为3.94mil&#xff0c;但是应该大于等于5mil&#xff1b;如下两张图&#xff1a; 检查&…...

ebpf基础篇(二) ----- ebpf前世今生

bpf 要追述ebpf的历史,就不得不提bpf. bpf(Berkeley Packet Filter)从早(1992年)诞生于类Unix系统中,用于数据包分析. 它提供了数据链路层的接口,可以在数据链路层发送和接收数据.如果网卡支持混杂模式,所有的数据包都可以被接收,即使这些数据包的目的地址是其它主机. BPF最为…...

我的一天:追求专业成长与生活平衡

早晨的序幕&#xff1a;奋斗的开始 今天的一天始于清晨的6点47分。实现了昨天的早睡早起的蜕变计划。洗漱完成之后&#xff0c;7点17分出门&#xff0c;7点33分我抵达公司&#xff0c;为新的一天做好准备。7点52分&#xff0c;我开始我的学习之旅。正如我所体会的&#xff0c;“…...

【动态规划】斐波那契数列模型

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析&#xff08;3&#xff09; 前言 算法原理 1.状态表示 是什么&#xff1f;dp表(一维数组…...

机器人运动学分析与动力学分析主要作用

机器人运动学分析和动力学分析是两个重要的概念&#xff0c;它们在研究和设计工业机器人时起着关键作用。 1. 机器人运动学分析&#xff1a; 机器人运动学是研究机器人运动的科学&#xff0c;它涉及机器人的位置、速度、加速度和轨迹等方面。机器人运动学分析主要包括正解和逆…...

【Java 基础】33 JDBC

文章目录 1. 数据库连接1&#xff09;加载驱动2&#xff09;建立连接 2. 常见操作1&#xff09;创建表2&#xff09;插入数据3&#xff09;查询数据4&#xff09;使用 PreparedStatement5&#xff09;事务管理 3. 注意事项总结 Java Database Connectivity&#xff08;JDBC&…...

Unity中Shader缩放矩阵

文章目录 前言一、直接相乘缩放1、在属性面板定义一个四维变量&#xff0c;用xyz分别控制在xyz轴上的缩放2、在常量缓存区申明该变量3、在顶点着色器对其进行相乘&#xff0c;来缩放变换4、我们来看看效果 二、使用矩阵乘法代替直接相乘缩放的原理1、我们按如下格式得到缩放矩阵…...

Nessus详细安装-windows (保姆级教程)

Nessus描述 Nessus 是一款广泛使用的网络漏洞扫描工具。它由 Tenable Network Security 公司开发&#xff0c;旨在帮助组织评估其计算机系统和网络的安全性。 Nessus 可以执行自动化的漏洞扫描&#xff0c;通过扫描目标系统、识别和评估可能存在的安全漏洞和弱点。它可以检测…...

Stream流的简单使用

stream流的三类方法 获取Stream流 ○ 创建一条流水线,并把数据放到流水线上准备进行操作中间方法 ○ 流水线上的操作 ○ 一次操作完毕之后,还可以继续进行其他操作终结方法 ○ 一个Stream流只能有一个终结方法 ○ 是流水线上的最后一个操作 其实Stream流非常简单&#xff0c;只…...

智能优化算法应用:基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.蛇优化算法4.实验参数设定5.算法结果6.参考文…...

vue和react diff的详解和不同

diff算法 简述&#xff1a;第一次对比真实dom和虚拟树之间的同层差别&#xff0c;后面为对比新旧虚拟dom树之间的同层差别。 虚拟dom 简述&#xff1a;js对象形容模拟真实dom 具体&#xff1a; 1.虚拟dom是存在内存中的js对象&#xff0c;利用内存的高效率运算。虚拟dom属…...

智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鹈鹕算法4.实验参数设定5.算法结果6.参考文献7.MA…...

10:IIC通信

1&#xff1a;IIC通信 I2C总线&#xff08;Inter IC BUS&#xff09; 是由Philips公司开发的一种通用数据总线&#xff0c;应用广泛&#xff0c;下面是一些指标参数&#xff1a; 两根通信线&#xff1a;SCL&#xff08;Serial Clock&#xff0c;串行时钟线&#xff09;、SDA&a…...

互联网上门洗衣洗鞋小程序优势有哪些?

互联网洗鞋店小程序相较于传统洗鞋方式&#xff0c;具有以下优势&#xff1b; 1. 便捷性&#xff1a;用户只需通过手机即可随时随地下单并查询&#xff0c;省去了许多不必要的时间和精力。学生们无需走出宿舍或校园&#xff0c;就能轻松预约洗鞋并取件。 2. 精准定位&#xff1…...

Java中如何优雅地根治null值引起的Bug问题

1. Java对象为null会引发的问题 NullPointerException&#xff1a;当你尝试调用或访问一个null对象的属性或方法时&#xff0c;Java会抛出NullPointerException异常。例如&#xff0c;如果你有一个名为person的变量&#xff0c;它被设置为null&#xff0c;然后你尝试调用perso…...

C# WPF上位机开发(子窗口通知父窗口更新进度)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 这两天在编写代码的时候&#xff0c;正好遇到一个棘手的问题&#xff0c;解决之后感觉挺有意义的&#xff0c;所以先用blog记录一下&#xff0c;后…...

XUbuntu22.04之跨平台容器格式工具:MKVToolNix(二百零三)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…...

vue中的生命周期和VueComponent实例对象

生命周期 生命周期又叫生命周期钩子&#xff0c;生命周期函数 生命周期是&#xff0c;Vue在关键的时刻帮我们调用的一些特殊名字的函数 生命周期的this指向vm或者组件实例对象 mounted会将初始化的Dom挂载到页面上 <template><div class"hello"><…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

java_网络服务相关_gateway_nacos_feign区别联系

1. spring-cloud-starter-gateway 作用&#xff1a;作为微服务架构的网关&#xff0c;统一入口&#xff0c;处理所有外部请求。 核心能力&#xff1a; 路由转发&#xff08;基于路径、服务名等&#xff09;过滤器&#xff08;鉴权、限流、日志、Header 处理&#xff09;支持负…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)

要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况&#xff0c;可以通过以下几种方式模拟或触发&#xff1a; 1. 增加CPU负载 运行大量计算密集型任务&#xff0c;例如&#xff1a; 使用多线程循环执行复杂计算&#xff08;如数学运算、加密解密等&#xff09;。运行图…...

【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)

升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点&#xff0c;但无自动故障转移能力&#xff0c;Master宕机后需人工切换&#xff0c;期间消息可能无法读取。Slave仅存储数据&#xff0c;无法主动升级为Master响应请求&#xff…...

C# 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...