Seata源码——TCC模式解析02
初始化
在SpringBoot启动的时候通过自动注入机制将GlobalTransactionScanner注入进ioc而GlobalTransactionScanner继承AbstractAutoProxyCreatorAbstract 在postProcessAfterInitialization阶段由子类创建代理TccActionInterceptor
GlobalTransactionScanner
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// ...// 注册RM、判断是否需要代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // tcc_fence_log清理任务TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);// 代理逻辑TccActionInterceptorinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); }// ...
}
TCC下的Bean类型
TCC模式下有三种特殊的SpringBean。
1.LocalTCC注释接口的Bean:如案例中的LocalTccAction;
2.RPC服务提供方ServiceBean:如Dubbo中被@DubboService注释的服务实现类,如案例中的StorageTccActionImpl;
3.RPC服务消费方ReferenceBean:如Dubbo中被@DubboReference注入的Bean,如案例中的StorageTccAction;
判断是否需要代理
TCCBeanParserUtils
public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {// dubbo:service 和 LocalTCC 注册为 RMboolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);if (isRemotingBean) {if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {// LocalTCC 需要被代理 TccActionInterceptorreturn isTccProxyTargetBean(remotingDesc); // 1} else {// dubbo:service(ServiceBean) 不需要被代理return false; // 2}} else {if (remotingDesc == null) {if (isRemotingFactoryBean(bean, beanName, applicationContext)) {// dubbo:reference(Dubbo ReferenceBean) 需要被代理 TccActionInterceptorremotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);return isTccProxyTargetBean(remotingDesc); // 3} else {return false;}} else {return isTccProxyTargetBean(remotingDesc);}}
}
isTccProxyTargetBean判断LocalTCC和ReferenceBean具体是否会被代理,只有接口里有TwoPhaseBusinessAction注解方法的类,才会返回true,被TccActionInterceptor拦截。
public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {if (remotingDesc == null) {return false;}boolean isTccClazz = false;Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();Method[] methods = tccInterfaceClazz.getMethods();TwoPhaseBusinessAction twoPhaseBusinessAction;for (Method method : methods) {twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (twoPhaseBusinessAction != null) {isTccClazz = true;break;}}if (!isTccClazz) {return false;}short protocols = remotingDesc.getProtocol();if (Protocols.IN_JVM == protocols) {return true; // local tcc}return remotingDesc.isReference(); // dubbo:reference
}
注册为RM
识别所有LocalTCC和ServiceBean中被TwoPhaseBusinessAction注解标注的方法,每个TwoPhaseBusinessAction注解的方法都作为一个TCCResource注册到TC。
TCCBeanParserUtils
protected static boolean parserRemotingServiceInfo(Object bean, String beanName) {RemotingParser remotingParser = DefaultRemotingParser.get().isRemoting(bean, beanName);if (remotingParser != null) {return DefaultRemotingParser.get().parserRemotingServiceInfo(bean, beanName, remotingParser) != null;}return false;}
DefaultRemotingParser
public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);if (remotingBeanDesc == null) {return null;}remotingServiceMap.put(beanName, remotingBeanDesc);Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();Method[] methods = interfaceClass.getMethods();if (remotingParser.isService(bean, beanName)) {// localTcc or ServiceBeantry {Object targetBean = remotingBeanDesc.getTargetBean();for (Method m : methods) {TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);// 所有TwoPhaseBusinessAction注解标注的方法注册为一个Resourceif (twoPhaseBusinessAction != null) {TCCResource tccResource = new TCCResource();tccResource.setActionName(twoPhaseBusinessAction.name());tccResource.setTargetBean(targetBean);tccResource.setPrepareMethod(m);tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(),twoPhaseBusinessAction.commitArgsClasses()));tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(),twoPhaseBusinessAction.rollbackArgsClasses()));tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses());tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses());tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),twoPhaseBusinessAction.commitArgsClasses()));tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),twoPhaseBusinessAction.rollbackArgsClasses()));//注册到TCDefaultResourceManager.get().registerResource(tccResource);}}} catch (Throwable t) {throw new FrameworkException(t, "parser remoting service error");}}if (remotingParser.isReference(bean, beanName)) {remotingBeanDesc.setReference(true);}return remotingBeanDesc;
}
一阶段(Try)
TccActionInterceptor会拦截所有标注了TwoPhaseBusinessAction注解的方法执行invoke方法执行一阶段处理
一阶段其实就做了三件事
1.创建BusinessContext
2.将BusinessContext添加到上下文中让后续的Commit和Rollback能拿到数据
3.创建分支事务
// TccActionInterceptor
private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler();
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {return invocation.proceed();}Method method = getActionInterfaceMethod(invocation);TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (businessAction != null) {String xid = RootContext.getXID();BranchType previousBranchType = RootContext.getBranchType();if (BranchType.TCC != previousBranchType) {RootContext.bindBranchType(BranchType.TCC);}try {return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction,invocation::proceed);} finally {if (BranchType.TCC != previousBranchType) {RootContext.unbindBranchType();}MDC.remove(RootContext.MDC_KEY_BRANCH_ID);}}return invocation.proceed();
}
ActionInterceptorHandler
public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,Callback<Object> targetCallback) throws Throwable {Map<String, Object> ret = new HashMap<>(4);//TCC nameString actionName = businessAction.name();//创建BusinessActionContext BusinessActionContext actionContext = new BusinessActionContext();//设置全局事务idactionContext.setXid(xid);//设置事务唯一名称 这里是从@TwoPhaseBusinessAction注解里面的name拿过来的actionContext.setActionName(actionName);//注册分支事务String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);//设置分支事务idactionContext.setBranchId(branchId);//MDC put branchIdMDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId);//设置BusinessActionContext属性信息Class<?>[] types = method.getParameterTypes();int argIndex = 0;for (Class<?> cls : types) {if (cls.getName().equals(BusinessActionContext.class.getName())) {arguments[argIndex] = actionContext;break;}argIndex++;}//the final parameters of the try methodret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);//执行业务方法,即try方法ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());return ret;}
BusinessActionContext 信息
public class BusinessActionContext implements Serializable {private static final long serialVersionUID = 6539226288677737991L;// 全局事务idprivate String xid;// 分支事务idprivate String branchId;// @TwoPhaseBusinessAction.nameprivate String actionName;// actionContextprivate Map<String, Object> actionContext;}
actionContext存储了包括:try方法名(sys::prepare)、commit方法名(sys::commit)、rollback方法名(sys::rollback)、actionName(@TwoPhaseBusinessAction.name)、是否开启tccFence(@TwoPhaseBusinessAction.useTCCFence)、参数名称和参数值。
注册分支事务
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,BusinessActionContext actionContext) {String actionName = actionContext.getActionName();String xid = actionContext.getXid();//获取actionContext信息Map<String, Object> context = fetchActionRequestContext(method, arguments);context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());//初始化 BusinessContextinitBusinessContext(context, method, businessAction);//初始化上下文initFrameworkContext(context);//设置上下文信息actionContext.setActionContext(context);//init applicationDataMap<String, Object> applicationContext = new HashMap<>(4);applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);String applicationContextStr = JSON.toJSONString(applicationContext);try {//注册分支事务Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null);return String.valueOf(branchId);} catch (Throwable t) {String msg = String.format("TCC branch Register error, xid: %s", xid);LOGGER.error(msg, t);throw new FrameworkException(t, msg);}}
初始化BusinessContext
将TwoPhaseBusinessAction 注解的参数放入上下文中
protected void initBusinessContext(Map<String, Object> context, Method method,TwoPhaseBusinessAction businessAction) {if (method != null) {//the phase one method namecontext.put(Constants.PREPARE_METHOD, method.getName());}if (businessAction != null) {//the phase two method namecontext.put(Constants.COMMIT_METHOD, businessAction.commitMethod());context.put(Constants.ROLLBACK_METHOD, businessAction.rollbackMethod());context.put(Constants.ACTION_NAME, businessAction.name());}}
初始化上下文
将本地IP放入上下文中
protected void initFrameworkContext(Map<String, Object> context) {try {context.put(Constants.HOST_NAME, NetUtil.getLocalIp());} catch (Throwable t) {LOGGER.warn("getLocalIP error", t);}}
注册分支事务
RM进行分支事务的注册
RM进行分支事务的注册
AbstractResourceManager
@Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));}return response.getBranchId();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);}}
TC处理分支事务注册请求
AbstractCore
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {// Step1 根据xid查询global_table得到GlobalSessionGlobalSession globalSession = assertGlobalSessionNotNull(xid, false);// 对于存储模式=file的情况,由于GlobalSession在内存中,所以需要获取锁后再执行// 对于存储模式=db/redis的情况,不需要获取锁return SessionHolder.lockAndExecute(globalSession, () -> {// 状态校验 必须为beginglobalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));// Step2 获取全局锁(只有AT模式需要)branchSessionLock(globalSession, branchSession);try {// Step3 保存分支事务globalSession.addBranch(branchSession);} catch (RuntimeException ex) {branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}return branchSession.getBranchId();});
}
二阶段Commit
TM发起全局事务提交
当Try处理完之后 TM发起GlobalCommitRequest给TC,TC负责执行每个分支事务提交 这里在AT模式里面讲过 不知道的回看
TC处理二阶段提交
public GlobalStatus commit(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// 如果分支事务存在AT模式,先释放全局锁,delete from lock_table where xid = ?globalSession.closeAndClean();// 如果分支事务都是AT模式,则可以执行异步提交if (globalSession.canBeCommittedAsync()) {// 执行异步提交,更新全局事务状态为AsyncCommitting,update global_table set status = AsyncCommitting where xid = ?globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {// TCCglobalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) { // 同步提交(TCC)boolean success = doGlobalCommit(globalSession, false);if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else { // 异步提交(AT)return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}
}
1.执行全局事务提交核心逻辑,如果二阶段提交失败,会重试至成功为止。
2.因为这个方法也是AT模式用的 所以假如都是AT模式会被异步调用最后是释放锁 删除分支事务 删除undo_log 删除全局事务 自此提交结束 这个在前面讲过
3.假如是AT和TCC混合使用AT模式的分支事务会在异步任务中再次执行doGlobalCommit异步提交,TCC模式的分支事务还是会在第一次调用doGlobalCommit时同步提交,如果中间存在分支事务提交失败,会异步重试直至成功。
DefaultCore
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// AT模式和TCC模式共存的情况下,AT模式跳过同步提交,只对TCC模式分支事务同步提交if (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}try {// Step1 发送BranchCommitRequest给RM,AT模式RM会删除undo_log,TCC模式RM执行二阶段提交BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Committed:// Step2 删除branch_table中的分支事务记录SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable: // 不可重试(XA中有实现)return false;default:if (!retrying) {// 更新全局事务为二阶段提交重试状态,异步重试至成功位置globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {return CONTINUE;} else {return false;}}} catch (Exception ex) {if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}// 某个分支事务处理失败,继续处理后续分支事务return CONTINUE;});// 如果是同步提交,某个分支事务处理失败,直接返回falseif (result != null) {return result;}if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {return false;}if (!retrying) {globalSession.setStatus(GlobalStatus.Committed);}}if (success && globalSession.getBranchSessions().isEmpty()) {// Step3 删除全局事务 delete from global_table where xid = ?SessionHelper.endCommitted(globalSession, retrying);}return success;
}
向RM发送分支事务提交请求
AbstractCore
protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession,BranchSession branchSession) throws IOException, TimeoutException {BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(branchSession.getResourceId(), branchSession.getClientId(), request);return response.getBranchStatus();
}// AbstractNettyRemotingServer
public Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException {// 定位客户端ChannelChannel channel = ChannelManager.getChannel(resourceId, clientId);if (channel == null) {throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);}RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
获取客户端Channel
对于LocalTCC或者AT模式,分支事务注册与提交是同一个服务实例,通过resourceId+applicationId+ip+port一般就能定位到二阶段通讯的服务实例,但是可能对应服务宕机或者宕机后重连,这边会降级去找同一个ip不同port的,或者同一个applicationId的不同ip:port。
对于TCC模式下二阶段要找ServiceBean服务提供方的情况,直接进入Step2-fallback,找同一个resourceId下的其他applicationId注册的RM,这里就能找到storage-service进行二阶段提交,所以resourceId(actionName)最好全局唯一。
ChannelManager
public static Channel getChannel(String resourceId, String clientId) {Channel resultChannel = null;String[] clientIdInfo = readClientId(clientId);String targetApplicationId = clientIdInfo[0];String targetIP = clientIdInfo[1];int targetPort = Integer.parseInt(clientIdInfo[2]);ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);// Step1 根据resourceId找对应applicationId-ip-port对应channelif (targetApplicationId == null || applicationIdMap == null || applicationIdMap.isEmpty()) {return null;}ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);// Step2 根据BranchSession注册的applicationId应用if (ipMap != null && !ipMap.isEmpty()) {// Step3 根据BranchSession注册的ipConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {// Step4 根据BranchSession注册的portRpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);if (exactRpcContext != null) {Channel channel = exactRpcContext.getChannel();if (channel.isActive()) {resultChannel = channel;}}// Step4-fallback 可能原始channel关闭了,遍历BranchSession注册的ip对应的其他port(resourceId+applicationId+ip)if (resultChannel == null) {for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP.entrySet()) {Channel channel = portMapOnTargetIPEntry.getValue().getChannel();if (channel.isActive()) {resultChannel = channel;break;} }}}// Step3-fallback BranchSession注册的ip没有对应Channel,从resourceId+applicationId找对应channelif (resultChannel == null) {for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap.entrySet()) {if (ipMapEntry.getKey().equals(targetIP)) { continue; }ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {continue;}for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {Channel channel = portMapOnOtherIPEntry.getValue().getChannel();if (channel.isActive()) {resultChannel = channel;break;} }if (resultChannel != null) { break; }}}}// Step2-fallback BranchSession注册的applicationId没有对应channel,从resourceId中找一个Channelif (resultChannel == null) {resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);}return resultChannel;}
分支事务提交请求
AbstractNettyRemotingServer
......return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
RM处理分支事务提交
TCCResourceManager
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// Step1 从本地缓存tccResourceMap中定位到资源对应本地commit方法TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);Object targetTCCBean = tccResource.getTargetBean();Method commitMethod = tccResource.getCommitMethod();try {// Step2 反序列化BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);// Step3 解析commit方法入参列表Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);Object ret;boolean result;// Step4 执行commit方法 也就相当于执行到了业务指定的commit方法if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {// Step4-1 开启useTCCFencetry {result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {throw e.getCause();}} else {// Step4-2 未开启useTCCFenceret = commitMethod.invoke(targetTCCBean, args);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}} else {result = true;}}//如果处理正常返回二阶段已提交 如果异常返回分支事务二阶段提交失败重试return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;} catch (Throwable t) {return BranchStatus.PhaseTwo_CommitFailed_Retryable;}
}
二阶段回滚
TM发起二阶段回滚请求
这个和AT那里的差不多 当TCC里面的try业务代码异常会触发二阶段的回滚
TC处理二阶段回滚请求
DefaultCore
public GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close();if (globalSession.getStatus() == GlobalStatus.Begin) {// 将全局锁lock_table状态更新为Rollbacking // 将全局事务global_table状态更新为RollbackingglobalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}// 执行全局回滚boolean rollbackSuccess = doGlobalRollback(globalSession, false);return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}
DefaultCore
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;//遍历分支事务Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// Step1 发送BranchRollbackRequestBranchStatus branchStatus = branchRollback(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Rollbacked:// Step2-1 释放全局锁,删除分支事务SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable: // 回滚失败且无法重试成功SessionHelper.endRollbackFailed(globalSession, retrying);return false;default:// Step2-2 如果RM回滚失败 全局事务状态变为RollbackRetrying 等待重试if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {if (!retrying) {// 如果Step1或Step2步骤异常 全局事务状态变为RollbackRetrying 等待重试globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// 如果存在一个分支事务回滚失败,则返回falseif (result != null) {return result;}// Step3// 对于file模式,直接删除全局事务// 对于db/redis模式,异步再次执行doGlobalRollback,这里不做任何处理// 防止由于各种网络波动造成分支事务注册成功lock_table和branch_table中始终有残留数据// 导致全局锁一直被占用,无法释放if (success) {SessionHelper.endRollbacked(globalSession, retrying);}return success;
}
SessionHelper
public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {// 如果是重试 或 file模式if (retryGlobal || !DELAY_HANDLE_SESSION) {long beginTime = System.currentTimeMillis();GlobalStatus currentStatus = globalSession.getStatus();boolean retryBranch =currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;if (isTimeoutGlobalStatus(currentStatus)) {globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);} else {globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);}// 删除全局事务global_tableglobalSession.end();}
}
RM处理分支事务回滚
TCCResourceManager
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// Step1 从本地缓存tccResourceMap中定位到资源对应本地rollback方法TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);Object targetTCCBean = tccResource.getTargetBean();Method rollbackMethod = tccResource.getRollbackMethod();try {// Step2 反序列化BusinessActionContext//BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);// Step3 解析rollback方法入参列表Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);Object ret;boolean result;// Step4 执行rollback方法if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {try {result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,args, tccResource.getActionName());} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {throw e.getCause();}} else {ret = rollbackMethod.invoke(targetTCCBean, args);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}} else {result = true;}}return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;} catch (Throwable t) {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}
}
相关文章:

Seata源码——TCC模式解析02
初始化 在SpringBoot启动的时候通过自动注入机制将GlobalTransactionScanner注入进ioc而GlobalTransactionScanner继承AbstractAutoProxyCreatorAbstract 在postProcessAfterInitialization阶段由子类创建代理TccActionInterceptor GlobalTransactionScanner protected Obje…...
缓存-Redis
Springboot使用Redis 引入pom依赖: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency>在application.yml、application-dev.yml中配置Redis的访…...

PADS Layout安全间距检查报错
问题: 在Pads Layout完成layout后,进行工具-验证设计安全间距检查时,差分对BAK_FIXCLK_100M_P / BAK_FIXCLK_100M_N的安全间距检查报错,最小为3.94mil,但是应该大于等于5mil;如下两张图: 检查&…...
ebpf基础篇(二) ----- ebpf前世今生
bpf 要追述ebpf的历史,就不得不提bpf. bpf(Berkeley Packet Filter)从早(1992年)诞生于类Unix系统中,用于数据包分析. 它提供了数据链路层的接口,可以在数据链路层发送和接收数据.如果网卡支持混杂模式,所有的数据包都可以被接收,即使这些数据包的目的地址是其它主机. BPF最为…...
我的一天:追求专业成长与生活平衡
早晨的序幕:奋斗的开始 今天的一天始于清晨的6点47分。实现了昨天的早睡早起的蜕变计划。洗漱完成之后,7点17分出门,7点33分我抵达公司,为新的一天做好准备。7点52分,我开始我的学习之旅。正如我所体会的,“…...

【动态规划】斐波那契数列模型
欢迎来到Cefler的博客😁 🕌博客主页:那个传说中的man的主页 🏠个人专栏:题目解析 🌎推荐文章:题目大解析(3) 前言 算法原理 1.状态表示 是什么?dp表(一维数组…...
机器人运动学分析与动力学分析主要作用
机器人运动学分析和动力学分析是两个重要的概念,它们在研究和设计工业机器人时起着关键作用。 1. 机器人运动学分析: 机器人运动学是研究机器人运动的科学,它涉及机器人的位置、速度、加速度和轨迹等方面。机器人运动学分析主要包括正解和逆…...

【Java 基础】33 JDBC
文章目录 1. 数据库连接1)加载驱动2)建立连接 2. 常见操作1)创建表2)插入数据3)查询数据4)使用 PreparedStatement5)事务管理 3. 注意事项总结 Java Database Connectivity(JDBC&…...

Unity中Shader缩放矩阵
文章目录 前言一、直接相乘缩放1、在属性面板定义一个四维变量,用xyz分别控制在xyz轴上的缩放2、在常量缓存区申明该变量3、在顶点着色器对其进行相乘,来缩放变换4、我们来看看效果 二、使用矩阵乘法代替直接相乘缩放的原理1、我们按如下格式得到缩放矩阵…...

Nessus详细安装-windows (保姆级教程)
Nessus描述 Nessus 是一款广泛使用的网络漏洞扫描工具。它由 Tenable Network Security 公司开发,旨在帮助组织评估其计算机系统和网络的安全性。 Nessus 可以执行自动化的漏洞扫描,通过扫描目标系统、识别和评估可能存在的安全漏洞和弱点。它可以检测…...

Stream流的简单使用
stream流的三类方法 获取Stream流 ○ 创建一条流水线,并把数据放到流水线上准备进行操作中间方法 ○ 流水线上的操作 ○ 一次操作完毕之后,还可以继续进行其他操作终结方法 ○ 一个Stream流只能有一个终结方法 ○ 是流水线上的最后一个操作 其实Stream流非常简单,只…...

智能优化算法应用:基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码
智能优化算法应用:基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.蛇优化算法4.实验参数设定5.算法结果6.参考文…...
vue和react diff的详解和不同
diff算法 简述:第一次对比真实dom和虚拟树之间的同层差别,后面为对比新旧虚拟dom树之间的同层差别。 虚拟dom 简述:js对象形容模拟真实dom 具体: 1.虚拟dom是存在内存中的js对象,利用内存的高效率运算。虚拟dom属…...

智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码
智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鹈鹕算法4.实验参数设定5.算法结果6.参考文献7.MA…...

10:IIC通信
1:IIC通信 I2C总线(Inter IC BUS) 是由Philips公司开发的一种通用数据总线,应用广泛,下面是一些指标参数: 两根通信线:SCL(Serial Clock,串行时钟线)、SDA&a…...

互联网上门洗衣洗鞋小程序优势有哪些?
互联网洗鞋店小程序相较于传统洗鞋方式,具有以下优势; 1. 便捷性:用户只需通过手机即可随时随地下单并查询,省去了许多不必要的时间和精力。学生们无需走出宿舍或校园,就能轻松预约洗鞋并取件。 2. 精准定位࿱…...
Java中如何优雅地根治null值引起的Bug问题
1. Java对象为null会引发的问题 NullPointerException:当你尝试调用或访问一个null对象的属性或方法时,Java会抛出NullPointerException异常。例如,如果你有一个名为person的变量,它被设置为null,然后你尝试调用perso…...

C# WPF上位机开发(子窗口通知父窗口更新进度)
【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 这两天在编写代码的时候,正好遇到一个棘手的问题,解决之后感觉挺有意义的,所以先用blog记录一下,后…...

XUbuntu22.04之跨平台容器格式工具:MKVToolNix(二百零三)
简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…...
vue中的生命周期和VueComponent实例对象
生命周期 生命周期又叫生命周期钩子,生命周期函数 生命周期是,Vue在关键的时刻帮我们调用的一些特殊名字的函数 生命周期的this指向vm或者组件实例对象 mounted会将初始化的Dom挂载到页面上 <template><div class"hello"><…...

XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...

C++初阶-list的底层
目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...

20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...

基于SpringBoot在线拍卖系统的设计和实现
摘 要 随着社会的发展,社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统,主要的模块包括管理员;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...

iview框架主题色的应用
1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题,无需引入,直接可…...
MySQL 部分重点知识篇
一、数据库对象 1. 主键 定义 :主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 :确保数据的完整性,便于数据的查询和管理。 示例 :在学生信息表中,学号可以作为主键ÿ…...