Seata源码——TCC模式解析02
初始化
在SpringBoot启动的时候通过自动注入机制将GlobalTransactionScanner注入进ioc而GlobalTransactionScanner继承AbstractAutoProxyCreatorAbstract 在postProcessAfterInitialization阶段由子类创建代理TccActionInterceptor
GlobalTransactionScanner
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// ...// 注册RM、判断是否需要代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // tcc_fence_log清理任务TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);// 代理逻辑TccActionInterceptorinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); }// ...
}
TCC下的Bean类型
TCC模式下有三种特殊的SpringBean。
1.LocalTCC注释接口的Bean:如案例中的LocalTccAction;
2.RPC服务提供方ServiceBean:如Dubbo中被@DubboService注释的服务实现类,如案例中的StorageTccActionImpl;
3.RPC服务消费方ReferenceBean:如Dubbo中被@DubboReference注入的Bean,如案例中的StorageTccAction;
判断是否需要代理
TCCBeanParserUtils
public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {// dubbo:service 和 LocalTCC 注册为 RMboolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);if (isRemotingBean) {if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {// LocalTCC 需要被代理 TccActionInterceptorreturn isTccProxyTargetBean(remotingDesc); // 1} else {// dubbo:service(ServiceBean) 不需要被代理return false; // 2}} else {if (remotingDesc == null) {if (isRemotingFactoryBean(bean, beanName, applicationContext)) {// dubbo:reference(Dubbo ReferenceBean) 需要被代理 TccActionInterceptorremotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);return isTccProxyTargetBean(remotingDesc); // 3} else {return false;}} else {return isTccProxyTargetBean(remotingDesc);}}
}
isTccProxyTargetBean判断LocalTCC和ReferenceBean具体是否会被代理,只有接口里有TwoPhaseBusinessAction注解方法的类,才会返回true,被TccActionInterceptor拦截。
public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {if (remotingDesc == null) {return false;}boolean isTccClazz = false;Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();Method[] methods = tccInterfaceClazz.getMethods();TwoPhaseBusinessAction twoPhaseBusinessAction;for (Method method : methods) {twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (twoPhaseBusinessAction != null) {isTccClazz = true;break;}}if (!isTccClazz) {return false;}short protocols = remotingDesc.getProtocol();if (Protocols.IN_JVM == protocols) {return true; // local tcc}return remotingDesc.isReference(); // dubbo:reference
}
注册为RM
识别所有LocalTCC和ServiceBean中被TwoPhaseBusinessAction注解标注的方法,每个TwoPhaseBusinessAction注解的方法都作为一个TCCResource注册到TC。
TCCBeanParserUtils
protected static boolean parserRemotingServiceInfo(Object bean, String beanName) {RemotingParser remotingParser = DefaultRemotingParser.get().isRemoting(bean, beanName);if (remotingParser != null) {return DefaultRemotingParser.get().parserRemotingServiceInfo(bean, beanName, remotingParser) != null;}return false;}
DefaultRemotingParser
public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);if (remotingBeanDesc == null) {return null;}remotingServiceMap.put(beanName, remotingBeanDesc);Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();Method[] methods = interfaceClass.getMethods();if (remotingParser.isService(bean, beanName)) {// localTcc or ServiceBeantry {Object targetBean = remotingBeanDesc.getTargetBean();for (Method m : methods) {TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);// 所有TwoPhaseBusinessAction注解标注的方法注册为一个Resourceif (twoPhaseBusinessAction != null) {TCCResource tccResource = new TCCResource();tccResource.setActionName(twoPhaseBusinessAction.name());tccResource.setTargetBean(targetBean);tccResource.setPrepareMethod(m);tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(),twoPhaseBusinessAction.commitArgsClasses()));tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(),twoPhaseBusinessAction.rollbackArgsClasses()));tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses());tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses());tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),twoPhaseBusinessAction.commitArgsClasses()));tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),twoPhaseBusinessAction.rollbackArgsClasses()));//注册到TCDefaultResourceManager.get().registerResource(tccResource);}}} catch (Throwable t) {throw new FrameworkException(t, "parser remoting service error");}}if (remotingParser.isReference(bean, beanName)) {remotingBeanDesc.setReference(true);}return remotingBeanDesc;
}
一阶段(Try)
TccActionInterceptor会拦截所有标注了TwoPhaseBusinessAction注解的方法执行invoke方法执行一阶段处理
一阶段其实就做了三件事
1.创建BusinessContext
2.将BusinessContext添加到上下文中让后续的Commit和Rollback能拿到数据
3.创建分支事务
// TccActionInterceptor
private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler();
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {return invocation.proceed();}Method method = getActionInterfaceMethod(invocation);TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (businessAction != null) {String xid = RootContext.getXID();BranchType previousBranchType = RootContext.getBranchType();if (BranchType.TCC != previousBranchType) {RootContext.bindBranchType(BranchType.TCC);}try {return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction,invocation::proceed);} finally {if (BranchType.TCC != previousBranchType) {RootContext.unbindBranchType();}MDC.remove(RootContext.MDC_KEY_BRANCH_ID);}}return invocation.proceed();
}
ActionInterceptorHandler
public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,Callback<Object> targetCallback) throws Throwable {Map<String, Object> ret = new HashMap<>(4);//TCC nameString actionName = businessAction.name();//创建BusinessActionContext BusinessActionContext actionContext = new BusinessActionContext();//设置全局事务idactionContext.setXid(xid);//设置事务唯一名称 这里是从@TwoPhaseBusinessAction注解里面的name拿过来的actionContext.setActionName(actionName);//注册分支事务String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);//设置分支事务idactionContext.setBranchId(branchId);//MDC put branchIdMDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId);//设置BusinessActionContext属性信息Class<?>[] types = method.getParameterTypes();int argIndex = 0;for (Class<?> cls : types) {if (cls.getName().equals(BusinessActionContext.class.getName())) {arguments[argIndex] = actionContext;break;}argIndex++;}//the final parameters of the try methodret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);//执行业务方法,即try方法ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());return ret;}
BusinessActionContext 信息
public class BusinessActionContext implements Serializable {private static final long serialVersionUID = 6539226288677737991L;// 全局事务idprivate String xid;// 分支事务idprivate String branchId;// @TwoPhaseBusinessAction.nameprivate String actionName;// actionContextprivate Map<String, Object> actionContext;}
actionContext存储了包括:try方法名(sys::prepare)、commit方法名(sys::commit)、rollback方法名(sys::rollback)、actionName(@TwoPhaseBusinessAction.name)、是否开启tccFence(@TwoPhaseBusinessAction.useTCCFence)、参数名称和参数值。
注册分支事务
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,BusinessActionContext actionContext) {String actionName = actionContext.getActionName();String xid = actionContext.getXid();//获取actionContext信息Map<String, Object> context = fetchActionRequestContext(method, arguments);context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());//初始化 BusinessContextinitBusinessContext(context, method, businessAction);//初始化上下文initFrameworkContext(context);//设置上下文信息actionContext.setActionContext(context);//init applicationDataMap<String, Object> applicationContext = new HashMap<>(4);applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);String applicationContextStr = JSON.toJSONString(applicationContext);try {//注册分支事务Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null);return String.valueOf(branchId);} catch (Throwable t) {String msg = String.format("TCC branch Register error, xid: %s", xid);LOGGER.error(msg, t);throw new FrameworkException(t, msg);}}
初始化BusinessContext
将TwoPhaseBusinessAction 注解的参数放入上下文中
protected void initBusinessContext(Map<String, Object> context, Method method,TwoPhaseBusinessAction businessAction) {if (method != null) {//the phase one method namecontext.put(Constants.PREPARE_METHOD, method.getName());}if (businessAction != null) {//the phase two method namecontext.put(Constants.COMMIT_METHOD, businessAction.commitMethod());context.put(Constants.ROLLBACK_METHOD, businessAction.rollbackMethod());context.put(Constants.ACTION_NAME, businessAction.name());}}
初始化上下文
将本地IP放入上下文中
protected void initFrameworkContext(Map<String, Object> context) {try {context.put(Constants.HOST_NAME, NetUtil.getLocalIp());} catch (Throwable t) {LOGGER.warn("getLocalIP error", t);}}
注册分支事务
RM进行分支事务的注册
RM进行分支事务的注册
AbstractResourceManager
@Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));}return response.getBranchId();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);}}
TC处理分支事务注册请求
AbstractCore
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {// Step1 根据xid查询global_table得到GlobalSessionGlobalSession globalSession = assertGlobalSessionNotNull(xid, false);// 对于存储模式=file的情况,由于GlobalSession在内存中,所以需要获取锁后再执行// 对于存储模式=db/redis的情况,不需要获取锁return SessionHolder.lockAndExecute(globalSession, () -> {// 状态校验 必须为beginglobalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));// Step2 获取全局锁(只有AT模式需要)branchSessionLock(globalSession, branchSession);try {// Step3 保存分支事务globalSession.addBranch(branchSession);} catch (RuntimeException ex) {branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}return branchSession.getBranchId();});
}
二阶段Commit
TM发起全局事务提交
当Try处理完之后 TM发起GlobalCommitRequest给TC,TC负责执行每个分支事务提交 这里在AT模式里面讲过 不知道的回看
TC处理二阶段提交
public GlobalStatus commit(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// 如果分支事务存在AT模式,先释放全局锁,delete from lock_table where xid = ?globalSession.closeAndClean();// 如果分支事务都是AT模式,则可以执行异步提交if (globalSession.canBeCommittedAsync()) {// 执行异步提交,更新全局事务状态为AsyncCommitting,update global_table set status = AsyncCommitting where xid = ?globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {// TCCglobalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) { // 同步提交(TCC)boolean success = doGlobalCommit(globalSession, false);if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else { // 异步提交(AT)return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}
}
1.执行全局事务提交核心逻辑,如果二阶段提交失败,会重试至成功为止。
2.因为这个方法也是AT模式用的 所以假如都是AT模式会被异步调用最后是释放锁 删除分支事务 删除undo_log 删除全局事务 自此提交结束 这个在前面讲过
3.假如是AT和TCC混合使用AT模式的分支事务会在异步任务中再次执行doGlobalCommit异步提交,TCC模式的分支事务还是会在第一次调用doGlobalCommit时同步提交,如果中间存在分支事务提交失败,会异步重试直至成功。
DefaultCore
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// AT模式和TCC模式共存的情况下,AT模式跳过同步提交,只对TCC模式分支事务同步提交if (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}try {// Step1 发送BranchCommitRequest给RM,AT模式RM会删除undo_log,TCC模式RM执行二阶段提交BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Committed:// Step2 删除branch_table中的分支事务记录SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable: // 不可重试(XA中有实现)return false;default:if (!retrying) {// 更新全局事务为二阶段提交重试状态,异步重试至成功位置globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {return CONTINUE;} else {return false;}}} catch (Exception ex) {if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}// 某个分支事务处理失败,继续处理后续分支事务return CONTINUE;});// 如果是同步提交,某个分支事务处理失败,直接返回falseif (result != null) {return result;}if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {return false;}if (!retrying) {globalSession.setStatus(GlobalStatus.Committed);}}if (success && globalSession.getBranchSessions().isEmpty()) {// Step3 删除全局事务 delete from global_table where xid = ?SessionHelper.endCommitted(globalSession, retrying);}return success;
}
向RM发送分支事务提交请求
AbstractCore
protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession,BranchSession branchSession) throws IOException, TimeoutException {BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(branchSession.getResourceId(), branchSession.getClientId(), request);return response.getBranchStatus();
}// AbstractNettyRemotingServer
public Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException {// 定位客户端ChannelChannel channel = ChannelManager.getChannel(resourceId, clientId);if (channel == null) {throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);}RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
获取客户端Channel
对于LocalTCC或者AT模式,分支事务注册与提交是同一个服务实例,通过resourceId+applicationId+ip+port一般就能定位到二阶段通讯的服务实例,但是可能对应服务宕机或者宕机后重连,这边会降级去找同一个ip不同port的,或者同一个applicationId的不同ip:port。
对于TCC模式下二阶段要找ServiceBean服务提供方的情况,直接进入Step2-fallback,找同一个resourceId下的其他applicationId注册的RM,这里就能找到storage-service进行二阶段提交,所以resourceId(actionName)最好全局唯一。
ChannelManager
public static Channel getChannel(String resourceId, String clientId) {Channel resultChannel = null;String[] clientIdInfo = readClientId(clientId);String targetApplicationId = clientIdInfo[0];String targetIP = clientIdInfo[1];int targetPort = Integer.parseInt(clientIdInfo[2]);ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);// Step1 根据resourceId找对应applicationId-ip-port对应channelif (targetApplicationId == null || applicationIdMap == null || applicationIdMap.isEmpty()) {return null;}ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);// Step2 根据BranchSession注册的applicationId应用if (ipMap != null && !ipMap.isEmpty()) {// Step3 根据BranchSession注册的ipConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {// Step4 根据BranchSession注册的portRpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);if (exactRpcContext != null) {Channel channel = exactRpcContext.getChannel();if (channel.isActive()) {resultChannel = channel;}}// Step4-fallback 可能原始channel关闭了,遍历BranchSession注册的ip对应的其他port(resourceId+applicationId+ip)if (resultChannel == null) {for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP.entrySet()) {Channel channel = portMapOnTargetIPEntry.getValue().getChannel();if (channel.isActive()) {resultChannel = channel;break;} }}}// Step3-fallback BranchSession注册的ip没有对应Channel,从resourceId+applicationId找对应channelif (resultChannel == null) {for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap.entrySet()) {if (ipMapEntry.getKey().equals(targetIP)) { continue; }ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {continue;}for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {Channel channel = portMapOnOtherIPEntry.getValue().getChannel();if (channel.isActive()) {resultChannel = channel;break;} }if (resultChannel != null) { break; }}}}// Step2-fallback BranchSession注册的applicationId没有对应channel,从resourceId中找一个Channelif (resultChannel == null) {resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);}return resultChannel;}
分支事务提交请求
AbstractNettyRemotingServer
......return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
RM处理分支事务提交
TCCResourceManager
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// Step1 从本地缓存tccResourceMap中定位到资源对应本地commit方法TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);Object targetTCCBean = tccResource.getTargetBean();Method commitMethod = tccResource.getCommitMethod();try {// Step2 反序列化BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);// Step3 解析commit方法入参列表Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);Object ret;boolean result;// Step4 执行commit方法 也就相当于执行到了业务指定的commit方法if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {// Step4-1 开启useTCCFencetry {result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {throw e.getCause();}} else {// Step4-2 未开启useTCCFenceret = commitMethod.invoke(targetTCCBean, args);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}} else {result = true;}}//如果处理正常返回二阶段已提交 如果异常返回分支事务二阶段提交失败重试return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;} catch (Throwable t) {return BranchStatus.PhaseTwo_CommitFailed_Retryable;}
}
二阶段回滚
TM发起二阶段回滚请求
这个和AT那里的差不多 当TCC里面的try业务代码异常会触发二阶段的回滚
TC处理二阶段回滚请求
DefaultCore
public GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close();if (globalSession.getStatus() == GlobalStatus.Begin) {// 将全局锁lock_table状态更新为Rollbacking // 将全局事务global_table状态更新为RollbackingglobalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}// 执行全局回滚boolean rollbackSuccess = doGlobalRollback(globalSession, false);return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}
DefaultCore
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;//遍历分支事务Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// Step1 发送BranchRollbackRequestBranchStatus branchStatus = branchRollback(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Rollbacked:// Step2-1 释放全局锁,删除分支事务SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable: // 回滚失败且无法重试成功SessionHelper.endRollbackFailed(globalSession, retrying);return false;default:// Step2-2 如果RM回滚失败 全局事务状态变为RollbackRetrying 等待重试if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {if (!retrying) {// 如果Step1或Step2步骤异常 全局事务状态变为RollbackRetrying 等待重试globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// 如果存在一个分支事务回滚失败,则返回falseif (result != null) {return result;}// Step3// 对于file模式,直接删除全局事务// 对于db/redis模式,异步再次执行doGlobalRollback,这里不做任何处理// 防止由于各种网络波动造成分支事务注册成功lock_table和branch_table中始终有残留数据// 导致全局锁一直被占用,无法释放if (success) {SessionHelper.endRollbacked(globalSession, retrying);}return success;
}
SessionHelper
public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {// 如果是重试 或 file模式if (retryGlobal || !DELAY_HANDLE_SESSION) {long beginTime = System.currentTimeMillis();GlobalStatus currentStatus = globalSession.getStatus();boolean retryBranch =currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;if (isTimeoutGlobalStatus(currentStatus)) {globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);} else {globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);}// 删除全局事务global_tableglobalSession.end();}
}
RM处理分支事务回滚
TCCResourceManager
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// Step1 从本地缓存tccResourceMap中定位到资源对应本地rollback方法TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);Object targetTCCBean = tccResource.getTargetBean();Method rollbackMethod = tccResource.getRollbackMethod();try {// Step2 反序列化BusinessActionContext//BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);// Step3 解析rollback方法入参列表Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);Object ret;boolean result;// Step4 执行rollback方法if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {try {result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,args, tccResource.getActionName());} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {throw e.getCause();}} else {ret = rollbackMethod.invoke(targetTCCBean, args);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}} else {result = true;}}return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;} catch (Throwable t) {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}
}
相关文章:

Seata源码——TCC模式解析02
初始化 在SpringBoot启动的时候通过自动注入机制将GlobalTransactionScanner注入进ioc而GlobalTransactionScanner继承AbstractAutoProxyCreatorAbstract 在postProcessAfterInitialization阶段由子类创建代理TccActionInterceptor GlobalTransactionScanner protected Obje…...

缓存-Redis
Springboot使用Redis 引入pom依赖: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency>在application.yml、application-dev.yml中配置Redis的访…...

PADS Layout安全间距检查报错
问题: 在Pads Layout完成layout后,进行工具-验证设计安全间距检查时,差分对BAK_FIXCLK_100M_P / BAK_FIXCLK_100M_N的安全间距检查报错,最小为3.94mil,但是应该大于等于5mil;如下两张图: 检查&…...

ebpf基础篇(二) ----- ebpf前世今生
bpf 要追述ebpf的历史,就不得不提bpf. bpf(Berkeley Packet Filter)从早(1992年)诞生于类Unix系统中,用于数据包分析. 它提供了数据链路层的接口,可以在数据链路层发送和接收数据.如果网卡支持混杂模式,所有的数据包都可以被接收,即使这些数据包的目的地址是其它主机. BPF最为…...

我的一天:追求专业成长与生活平衡
早晨的序幕:奋斗的开始 今天的一天始于清晨的6点47分。实现了昨天的早睡早起的蜕变计划。洗漱完成之后,7点17分出门,7点33分我抵达公司,为新的一天做好准备。7点52分,我开始我的学习之旅。正如我所体会的,“…...

【动态规划】斐波那契数列模型
欢迎来到Cefler的博客😁 🕌博客主页:那个传说中的man的主页 🏠个人专栏:题目解析 🌎推荐文章:题目大解析(3) 前言 算法原理 1.状态表示 是什么?dp表(一维数组…...

机器人运动学分析与动力学分析主要作用
机器人运动学分析和动力学分析是两个重要的概念,它们在研究和设计工业机器人时起着关键作用。 1. 机器人运动学分析: 机器人运动学是研究机器人运动的科学,它涉及机器人的位置、速度、加速度和轨迹等方面。机器人运动学分析主要包括正解和逆…...

【Java 基础】33 JDBC
文章目录 1. 数据库连接1)加载驱动2)建立连接 2. 常见操作1)创建表2)插入数据3)查询数据4)使用 PreparedStatement5)事务管理 3. 注意事项总结 Java Database Connectivity(JDBC&…...

Unity中Shader缩放矩阵
文章目录 前言一、直接相乘缩放1、在属性面板定义一个四维变量,用xyz分别控制在xyz轴上的缩放2、在常量缓存区申明该变量3、在顶点着色器对其进行相乘,来缩放变换4、我们来看看效果 二、使用矩阵乘法代替直接相乘缩放的原理1、我们按如下格式得到缩放矩阵…...

Nessus详细安装-windows (保姆级教程)
Nessus描述 Nessus 是一款广泛使用的网络漏洞扫描工具。它由 Tenable Network Security 公司开发,旨在帮助组织评估其计算机系统和网络的安全性。 Nessus 可以执行自动化的漏洞扫描,通过扫描目标系统、识别和评估可能存在的安全漏洞和弱点。它可以检测…...

Stream流的简单使用
stream流的三类方法 获取Stream流 ○ 创建一条流水线,并把数据放到流水线上准备进行操作中间方法 ○ 流水线上的操作 ○ 一次操作完毕之后,还可以继续进行其他操作终结方法 ○ 一个Stream流只能有一个终结方法 ○ 是流水线上的最后一个操作 其实Stream流非常简单,只…...

智能优化算法应用:基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码
智能优化算法应用:基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.蛇优化算法4.实验参数设定5.算法结果6.参考文…...

vue和react diff的详解和不同
diff算法 简述:第一次对比真实dom和虚拟树之间的同层差别,后面为对比新旧虚拟dom树之间的同层差别。 虚拟dom 简述:js对象形容模拟真实dom 具体: 1.虚拟dom是存在内存中的js对象,利用内存的高效率运算。虚拟dom属…...

智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码
智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鹈鹕算法4.实验参数设定5.算法结果6.参考文献7.MA…...

10:IIC通信
1:IIC通信 I2C总线(Inter IC BUS) 是由Philips公司开发的一种通用数据总线,应用广泛,下面是一些指标参数: 两根通信线:SCL(Serial Clock,串行时钟线)、SDA&a…...

互联网上门洗衣洗鞋小程序优势有哪些?
互联网洗鞋店小程序相较于传统洗鞋方式,具有以下优势; 1. 便捷性:用户只需通过手机即可随时随地下单并查询,省去了许多不必要的时间和精力。学生们无需走出宿舍或校园,就能轻松预约洗鞋并取件。 2. 精准定位࿱…...

Java中如何优雅地根治null值引起的Bug问题
1. Java对象为null会引发的问题 NullPointerException:当你尝试调用或访问一个null对象的属性或方法时,Java会抛出NullPointerException异常。例如,如果你有一个名为person的变量,它被设置为null,然后你尝试调用perso…...

C# WPF上位机开发(子窗口通知父窗口更新进度)
【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 这两天在编写代码的时候,正好遇到一个棘手的问题,解决之后感觉挺有意义的,所以先用blog记录一下,后…...

XUbuntu22.04之跨平台容器格式工具:MKVToolNix(二百零三)
简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…...

vue中的生命周期和VueComponent实例对象
生命周期 生命周期又叫生命周期钩子,生命周期函数 生命周期是,Vue在关键的时刻帮我们调用的一些特殊名字的函数 生命周期的this指向vm或者组件实例对象 mounted会将初始化的Dom挂载到页面上 <template><div class"hello"><…...

Hooked协议掀起WEB3新浪潮
随着区块链技术和加密货币的兴起,币圈已经成为全球范围内的一个热门领域。在这个充满机遇与挑战的行业中,Hook机制正逐渐成为一种重要的技术手段,为投资者、开发者以及相关机构提供了更多的选择和可能性。本文将详细介绍币圈中的Hook机制&…...

【图文教程】windows 下 MongoDB 介绍下载安装配置
文章目录 介绍MySQL 之间的区别和适用场景差异数据模型:查询语言:可扩展性:数据一致性: 下载安装环境变量配置 介绍 MongoDB 是一种开源的、面向文档的 NoSQL 数据库管理系统。它使用灵活的文档模型来存储数据,这意味…...

算法复杂度-BigO表示法
1.时间复杂度--大O表示法 算法的渐进时间复杂度,T(n)O(f(n)) T(n)表示算法的渐进时间复杂度 f(n)表示代码执行的次数 O()表示正比例关系 2.常用的时间复杂度量级 3.举例 (1&am…...

测试理论知识五:功能测试、系统测试、验收测试、安装测试、测试的计划与控制
模块测试的目的是发现程序模块与其接口规格说明之间的不一致。 功能测试的目的是为了证明程序未能符合其外部规格说明。 系统测试的目的是为了证明软件产品与其初始目标不一致。 1. 功能测试 功能测试是一个试图发现程序与其外部规格说明之间存在不一致的过程。功能测试通…...

太阳能爆闪警示灯
适用场所: 适用于高压线,塔吊,路政,船舶,种植,塔机,航海航道等场所起警示作用。 产品特点: 光控无开关,白天不闪,昏暗环境自动闪烁,无需手动操作,省时省事; 采用红色LED作光源,亮度高&#…...

怎么为pdf文件添加水印?
怎么为pdf文件添加水印?PDF是一种很好用的文件格式,这种格式能够很有效的保护我们的文件,但有时可能还会被破解,这种时候在PDF上添加水印就是比较好的方法。 综上所述,PDF是保密性很强的文件,但添加水印能够…...

基于ssm医药信息管理系统论文
基于SSM的医药信息管理系统的设计与实现 摘要 当下,正处于信息化的时代,许多行业顺应时代的变化,结合使用计算机技术向数字化、信息化建设迈进。以前相关行业对于医药信息的管理和控制,采用人工登记的方式保存相关数据ÿ…...

Ceph存储体系架构?
Ceph体系架构主要由RADOS和RADOS GW和RBD以及CephFS构成。 RADOS(Reliable, Autonomic Distributed Object Store)是Ceph的底层核心,RADOS本身也是分布式存储系统,CEPH所有的存储功能都是基于RADOS实现。RADOS由两个组件组成&…...

详解现实世界资产(RWAs)
区块链中的现实世界资产(RWAs)是代表实际和传统金融资产的数字通证,如货币、大宗商品、股票和债券。 实际世界资产(RWA)的通证化是区块链行业中最大的市场机会之一,潜在市场规模可达数万万亿美元。理论上&…...

Windows漏洞利用开发——利用ROP绕过DEP保护
实验6 Windows漏洞利用开发 6.1实验名称 Windows漏洞利用开发 6.2实验目的 学习windows漏洞利用开发,使用kali linux相关工具对windows内目标程序进行漏洞利用 6.3实验步骤及内容 第三阶段:利用ROP绕过DEP保护 了解DEP保护理解构造ROP链从而绕过DEP…...