当前位置: 首页 > news >正文

RocketMQ底层源码解析——事务消息的实现

1. 简介

RocketMQ自身实现了事务消息,可以通过这个机制来实现一些对数据一致性有强需求的场景,保证上下游数据的一致性。
在这里插入图片描述

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

引入RocketMQ之后保证上下游数据的一致性。
在这里插入图片描述
使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。

整个事务消息的详细交互流程如下图所示:
在这里插入图片描述
事务消息详细步骤:

  1. 生产者将半事务消息发送至 RocketMQ Broker。
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置。

2. 实战

分别启动:namesrv、broker、consumer、producer

producer示例代码:

public class TransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});producer.setNamesrvAddr("127.0.0.1:9876");producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {try {Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();}static class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}}
}

消费者示例代码:

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("TopicTest1234", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

将producer和consumer的topic都设置为:TopicTest1234。然后再启动consumer和producer。可以看到TransactionProducer控制台输出日志:
在这里插入图片描述
Consumer控制台输出日志:
在这里插入图片描述

3. 原理分析

分别启动namesrv和broker服务,随后运行:org.apache.rocketmq.example.transaction.TransactionProducer.java,构建出一个TransactionListenerImpl对象之后,添加到TransactionProducer中,为半消息的发送以及本地事务校验做做准备。

核心入口:TransactionProducer#sendMessageInTransaction()

    @Overridepublic TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException {if (null == this.transactionListener) {throw new MQClientException("TransactionListener is null", null);}// 包装Topic,判断是否是重试Topic以及DLQ的Topicmsg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));// 发送事务消息return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);}

DefaultMQProducerImpl#sendMessageInTransaction()

    public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {// 获取事务监听器,就程序开头的时候传入的TransactionListenerImpl对象TransactionListener transactionListener = getCheckListener();// 新版本推荐事务监听器,已不使用localTransactionExecuterif (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// 设置DelayTimeLevel 参数if (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}// 消息校验Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;// 设置属性:TRAN_MSG为trueMessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");// 设置属性:PGROUP为producerGroupMessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {// ① 发送半消息sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {case SEND_OK: {try {// 事务ID不为空if (sendResult.getTransactionId() != null) {// 设置事务idmsg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {// 执行本地事务检查localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");// ② 执行本地事务检查                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {// ③ 调用endTransaction判断是否结束事务this.endTransaction(msg, sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}// 构建事务消息发送结果TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;}

在该方法中核心步骤分别位于①②③标识处。

①:调用链路为最终抵达:DefaultMQProducerImpl#sendDefaultImpl(),也就是producer发消息核心方法。
在sendDefaultImpl()中有一处比较关键的代码:

				// 事务final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}

此处代码给这条消息标识上事务消息的标志,最后通过mQClientAPIImpl往broker发送一条RPC的请求。

Broker端接受到请求之后经过请求code的分发,这条请求将由SendMessageProcessor#asyncSendMessage进行处理。

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 相关代码省略...// 事务消息标识,从请求头中获取String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 事务消息处理putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 将消息内容存储到CommitLog文件putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

asyncSendMessage方法处会从消息属性中获取到transFlag标识,判断到这条消息是一条半消息,然后调用this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner)。

方法调用链最终来到TrasactionalMessageBridge#asyncPutHalfMessage(),

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {return store.asyncPutMessage(parseHalfMessageInner(messageInner));}
   /*** 消息事务半消息* @param msgInner* @return*/private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {// 设置真正要发送的TopicMessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());// 设置真正要发送的queueIdMessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));// 设置半消息TopicmsgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());// 设置半消息queueIdmsgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;}

parseHalfMessageInner方法做了两件事,将这条半消息真正要发送的Topic以及queueId另外存起来,然后将这条半消息要发送到指定存储半消息的的RMQ_SYS_TRANS_HALF_TOPIC中,然后将queueId设置为0。(存储半消息的Topic只有1个queueId)
最后将这条半消息存入到CommitLog中,则步骤①执行完成。

② 步骤①发送半消息到CommitLog中存储之后,此时消息发送结果为SEND_OK,接着将调用TransactionListener#executeLocalTransaction()方法,检查本地事务的状态,此处由开发者自行实现代码逻辑。

③ 检查本地事务结束之后,会调用endTransaction()方法来尝试结束此次的事务消息。

public void endTransaction(final Message msg,final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {final MessageId id;if (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}// 获取消息事务idString transactionId = sendResult.getTransactionId();// 获取broker地址final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);// 设置消息comimtlog偏移量requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {case COMMIT_MESSAGE:	// 提交消息requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:	// 回滚消息requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW:	// 未知状态requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;// 往broker发送endTransactionOneway请求this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());}

broker端EndTransactionProcessor#processRequest接收到endTransaction请求:

@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug("Transaction request:{}", requestHeader);if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");return response;}// 判断是事务check类型if (requestHeader.getFromTransactionCheck()) {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, but it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}} else {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {// 这里提交requestHeader,实际是从commitlog中获取halfMessageresult = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 检查halfMessage状态RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// ④ 半消息处理成功之后,调用endMessageTransaction()发起事务消息结束MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());// 清楚事务标识MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);// ⑤ 发送真正的完整的消息RemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {// ⑦ 删除半消息this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {// ⑥ 发起消息回滚,获取要回滚的半消息result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 查询半消息RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// ⑦  删除半消息this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;}

④ 调用endMessageTransaction

private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {MessageExtBrokerInner msgInner = new MessageExtBrokerInner();// 取出真正要发送是TopicmsgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));// 取出真正要发送的queueidmsgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());msgInner.setWaitStoreMsgOK(false);msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));msgInner.setSysFlag(msgExt.getSysFlag());TopicFilterType topicFilterType =(msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG: TopicFilterType.SINGLE_TAG;long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());msgInner.setTagsCode(tagsCodeValue);MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);return msgInner;}

该方法将这条半消息设置成真正发送的Topic以及queueId。

⑤ 调用EndTransactionProcessor#sendFinalMessage(),发送真正的消息。最终调用MessageStore#putMessage()将该条消息存入CommitLog中。

⑥ 调用TransactionalMessageServiceImpl#rollbackMessage()对半消息进行回滚,但该方法实则是读取到半消息,然后再检查这条半消息。

⑦ 发送真正的消息或者是回滚半消息成功之后,随后调用TransactionalMessageServiceImpl#deletePrepareMessage()删除半消息。

经过①~⑦的步骤,整个RocketMQ的事务消息流程也就结束了,但是这里有一个关键点还没有讲解,当半消息发送成功了,本地事务执行成功发送本地事务状态时发生了broker断电或者是本地事务状态没有发送成功时,该如何保证整个流程能够正常运行呢?答案就是broker端会在启动时启动一个定时任务区检查本地事务的状态,也就是方法:TransactionListener#checkLocalTransaction()。

也就是Broker端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。此外,需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。

参考

  • RocketMQ官网-事务消息发送

相关文章:

RocketMQ底层源码解析——事务消息的实现

1. 简介 RocketMQ自身实现了事务消息&#xff0c;可以通过这个机制来实现一些对数据一致性有强需求的场景&#xff0c;保证上下游数据的一致性。 以电商交易场景为例&#xff0c;用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统…...

学习802.11之MAC帧格式(一篇就够!)

802.11规范的关键在于MAC&#xff08;媒介访问控制层&#xff09;&#xff0c;MAC位于各式物理层之上&#xff0c;控制数据传输。负责核心成帧操作以及与有线骨干网络之间的交互。 802.11 MAC采用载波监听多路访问&#xff08;CSMA&#xff09;机制来控制对传输媒介的访问&…...

使用阿里云IoT Studio建立物模型可视化界面

使用阿里云IoT Studio建立物模型可视化界面 上一篇文章介绍了如何使用ESP-01S上报数据到物模型&#xff1a;https://blog.csdn.net/weixin_46251230/article/details/128996719 这次使用阿里云IoT Studio建立物模型的Web页面 阿里云IoT Studio&#xff1a; https://studio.i…...

HBase 复习 ---- chapter07

HBase 复习 ---- chapter07部署 HBase&#xff08;运维&#xff09; 1&#xff1a;部署 HBase 实际是部署了三个技术&#xff08;hadoop zookeeper hbase&#xff09; hadoop hdfs mapreduce common hdfs namenode datanode secondaryNamenode yarn ResourceManager&a…...

跟我一起写Makefile--个人总结

此篇笔记是根据陈皓大佬《跟我一起写Makefile》学习所得 文章目录换行符clean变量make的自动推导另类风格的Makefile清空目标文件的规则cleanMakefile总述显示规则隐晦规则变量的定义注释引用其它的Makefile环境变量MAKEFILESmake的工作方式书写规则规则举例规则的语法在规则中…...

设计模式之为什么要学好设计模式

目录1 回顾软件设计原则2 设计模式总览3 经典框架都在用设计模式解决问题1 回顾软件设计原则 不用设计模式并非不可以&#xff0c;但是用好设计模式能帮助我们更好地解决实际问题&#xff0c;设计模式最重要的是解耦。设计模式天天都在用&#xff0c;但自己却无感知。我们把设…...

大数据时代的小数据神器 - asqlcell

自从Google发布了经典的MapReduce论文&#xff0c;以及Yahoo开源了Hadoop的实现&#xff0c;大数据这个词就成为了一个行业的热门。在不断提高的机器性能和各种层出不穷的工具框架加持下&#xff0c;数据分析开始从过去的采样抽查变成全量整体&#xff0c;原先被抽样丢弃的隐藏…...

【呕心沥血】整理全栈自动化测试技术(三):如何编写技术方案

前面两篇笔记我介绍了自动化测试前期调研注意事项和前置准备阶段切入点&#xff0c;有同学在后台提问&#xff1a; “做完前期的调研和准备工作&#xff0c;领导要求写一个落地方案并评审&#xff0c;自动化测试的落地方案该怎么写”&#xff1f; 首先这个要求我觉得挺正常&a…...

67. 二进制求和

文章目录题目描述竖式模拟转换为十进制计算题目描述 给你两个二进制字符串 a 和 b &#xff0c;以二进制字符串的形式返回它们的和。 示例 1&#xff1a; 输入:a “11”, b “1” 输出&#xff1a;“100” 示例 2&#xff1a; 输入&#xff1a;a “1010”, b “1011” …...

1555数列极差(队列 优先队列 )

目录 题目描述 解题思路 代码部分 题目描述 在黑板上写了N个正整数作成的一个数列&#xff0c;进行如下操作&#xff1a;每一次擦去其中的两个数a和b&#xff0c;然后在数列中加入一个数a*b1&#xff0c;如此下去直至黑板上剩下一个数&#xff0c;在所有按这种操作方式最后得…...

代码随想录算法训练营第二十七天 | 93.复原IP地址,78.子集,90.子集II

一、参考资料复原IP地址题目链接/文章讲解&#xff1a;https://programmercarl.com/0093.%E5%A4%8D%E5%8E%9FIP%E5%9C%B0%E5%9D%80.html 视频讲解&#xff1a;https://www.bilibili.com/video/BV1XP4y1U73i/子集题目链接/文章讲解&#xff1a;https://programmercarl.com/0078.…...

jvm类加载器

概念 Bootstarp ClassLoader (引导类加载器) 加载String等核心的类Ext ClassLoader (拓展类加载器)System ClassLoader (系统类加载器) 加载用户自定义的类 关系 BootstrapClassLoader 包含 ExtClassLoaderExtClassLoader 包含 SystemClassLoader彼此是包含关系&#xff0c;不…...

Rust学习入门--【7】Rust 数据类型

类型系统 对于任何一门语言都是重中之重&#xff0c;因为它体现了语言所支持的不同类型的值。 类型系统 也是 IT 初学者最难啃的三座大山之一&#xff0c;而类型系统之所以难以理解&#xff0c;主要是没有合适的现成的参考体系。 我们说类型系统 存在的目的&#xff0c;就是 …...

阅读MySQL必知必会,查缺补漏

MySQL自带数据库 information_schema&#xff1a;是MySQL自带的数据库&#xff0c;主要保持MySQL数据库服务器的系统信息&#xff0c;比如数据库的名称&#xff0c;数据库表的名称&#xff0c;字段名称&#xff0c;存储权限等。 performance_schema&#xff1a;是MySQL系统自…...

MySQL数据库10——多表连接查询

数据如果在多个表里面&#xff0c;需要进行连接查询。 一般在pandas里面merge合并会用到一个索引&#xff0c;按这个索引的规则进行合并叫做有规则的等值连接。若不按规则连接&#xff0c;遍历两两组合的所有可能性&#xff0c;叫做笛卡尔积。 笛卡尔积连接 通常人们都会设置…...

华为OD机试 - 括号检查(Python)| 真题含思路

括号检查 题目 现有一字符串 仅由 (,),{,},[,] 六种括号组成,若字符串满足以下条件之一,则为无效字符串 任意类型的左右括号数量不相等 存在未按正确顺序(先左后右)闭合的括号, 输出括号的最大嵌套深度 若字符串无效则输出 0 0 <= 字符串长度 <= 100000 输入 一个只…...

安全渗透测试中的一款免费开源的超级关键词URL采集工具

安全渗透测试中的一款免费开源的超级关键词URL采集工具。 #################### 免责声明&#xff1a;工具本身并无好坏&#xff0c;希望大家以遵守《网络安全法》相关法律为前提来使用该工具&#xff0c;支持研究学习&#xff0c;切勿用于非法犯罪活动&#xff0c;对于恶意使…...

数据资产管理实践白皮书(6.0版)解读

目录 第一章数据资产管理概述 ( 一 ) 数据资产管理和数据要素的关系...

c/c++开发,无可避免的函数指针使用案例

一、函数指针简介 函数指针是指指向函数而非指向对象的指针。像其他指针一样&#xff0c;函数指针也指向某个特定的类型。函数类型由其返回类型以及形参表确定&#xff0c;而与函数名无关。例如&#xff1a; char* (*pf1)(char * p1,char *p2); 这是一个函数指针&#xff0c;其…...

QT(12)-QThreadPool

1 简介 QThreadPool是Qt框架中的一个类&#xff0c;提供了一组工作线程池。该线程池自动管理一组工作线程&#xff0c;在线程可用时分配任务。使用线程池的主要优点是&#xff0c;它可以减少创建和销毁线程的开销&#xff0c;因为可以重复使用线程。 线程池设计用于场景中&am…...

【Java|golang】1138. 字母板上的路径

我们从一块字母板上的位置 (0, 0) 出发&#xff0c;该坐标对应的字符为 board[0][0]。 在本题里&#xff0c;字母板为board [“abcde”, “fghij”, “klmno”, “pqrst”, “uvwxy”, “z”]&#xff0c;如下所示。 我们可以按下面的指令规则行动&#xff1a; 如果方格存…...

Flink 1.14从简单到源码第三讲

文章目录 1.flink多流操作Api1.1split 分流操作1.2.侧输出流1.3.connect 连接操作1.4.union 操作1.5 coGroup 协同分组1.6 join1.7 broadcast 广播2.process3.并行度和Api3.1 任务提交简单流程3.2 task与算子链4. Flink 时间相关(窗口计算)4.1时间语义(窗口计算)4.2 新版api指定…...

淘宝API接口系列,获取购买到的商品订单列表,卖出的商品订单列表,订单详情,订单物流,买家信息,收货地址列表,买家token

custom自定义API操作buyer_order_list获取购买到的商品订单列表buyer_order_detail获取购买到的商品订单详情buyer_order_express获取购买到的商品订单物流buyer_address_list收货地址列表buyer_address_add添加收货地址buyer_info买家信息buyer_token买家tokenseller_order_li…...

ucos-ii 的任务调度原理和实现

ucosii 任务调度和原理1、ucos-ii 任务创建与任务调度 1.1、任务的创建 当你调用 OSTaskCreate( ) 进行任务的创建的时候&#xff0c;会初始化任务的堆栈、保存cpu的寄存器、创建任务的控制块&#xff08;OS_TCB&#xff09;等的操作&#xff1b; if (OSTCBPrioTbl[prio] (OS_…...

Solon2 开发之容器,七、切面与函数环绕拦截

想要环绕拦截一个 Bean 的函数。需要三个前置条件&#xff1a; 通过注解做为“切点”&#xff0c;进行拦截&#xff08;不能无缘无故给拦了吧&#xff1f;费性能&#xff09;Bean 的 method 是被代理的在 Bean 被扫描之前&#xff0c;完成环绕拦截的注册 1、定义切点和注册环…...

代码随想录第十天(28)

文章目录28. 找出字符串中第一个匹配项的下标看答案KMPnext数组&#xff08;前缀表&#xff09;最长公共前后缀如何计算前缀表前缀表与next数组时间复杂度分析28. 找出字符串中第一个匹配项的下标 莫得思路……好久没做题&#xff0c;都已经忘得差不多了 看答案 其实就是自己…...

循环队列来了解一下!!

笔者在之前的一篇文章&#xff0c;详细的介绍了&#xff1a;队列之单向链表与双向链表的模拟实现&#xff1a;https://blog.csdn.net/weixin_64308540/article/details/128742090?spm1001.2014.3001.5502 感兴趣的各位老铁&#xff0c;可以参考一下啦&#xff01;下面进入循环…...

Idea打包springboot项目war包,测试通过

pom.xml文件 <!--包名以及版本号&#xff0c;这个是打包时候使用&#xff0c;版本可写可不写&#xff0c;建议写有利于维护系统--> <artifactId>tsgdemo</artifactId> <version>0.0.1-SNAPSHOT</version> <!--打包形式--> <packaging&…...

python+django高校师生健康信息管理系统pycharm

管理员功能模块 4.1登录页面 管理员登录&#xff0c;通过填写注册时输入的用户名、密码、角色进行登录&#xff0c;如图所示。 4.2系统首页 管理员登录进入师生健康信息管理系统可以查看个人中心、学生管理、教师管理、数据收集管理、问卷分类管理、疫情问卷管理、问卷调查管理…...

CUDA中的流序内存分配

文章目录CUDA中的流序内存分配1. Introduction2. Query for Support3. API Fundamentals (cudaMallocAsync and cudaFreeAsync)4. Memory Pools and the cudaMemPool_t注意&#xff1a;设备的内存池当前将是该设备的本地。因此&#xff0c;在不指定内存池的情况下进行分配将始终…...

济南网站排名推广/企业营销

前言&#xff1a; 在前两篇的文章中&#xff0c;我们学会了给组件添加属性、事件&#xff0c;以及对这些属性和事件进行描述添加&#xff0c;今天&#xff0c;我们就来小试一把这个组件吧&#xff0c;如果你忘记了前两篇文章的内容&#xff0c;可以从这里回顾一下&#xff1a; …...

教育培训的网站建设/软文是什么

作为一个从事.NET Web技术的开发人员&#xff0c;似乎没有什么理由可以不懂微软自己的AJAX框架&#xff0c;虽然它可能不太好用&#xff0c;或者用起来没有像jQuery这样的框架那么爽。我没有怎么用过UpdatePanel来做过复杂的东西&#xff0c;所以对于这个的优缺点就不予置评了。…...

深一网站建设/十大免费无代码开发软件

早晨起床时间&#xff1a;12:30 晚上休息时间&#xff1a;0:28 今日总结&#xff1a;休息中。...

日记类型 wordpress/一键优化清理加速

一、背景介绍 在我们日常使用Kali Linux时&#xff0c;我们通常在进行安全演练的时候&#xff0c;当我们拿下Windows靶机&#xff08;例如利用永恒之蓝拿下Win7主机&#xff09;后在命令行模式下如何进行文件下载以及文件上传呢&#xff1f;如何解决上述问题呢&#xff1f;接下…...

网站改版 权重/创建网址链接

package com.kk.innerClass;/*** 通过内部类实现接口* 解决多个接口中方法重名问题**/interface Machine {void run();}class Person {void run() {System.out.println("person start");}}public class Android extends Person {private class MachineHeart implemen…...

湖北省城乡建设厅网站/网络营销推广的方式有哪些

官方下载地址是&#xff1a;http://www.xdp.it/cximage/ 打开工程后可以看到下例这些工程&#xff1a; - CxImage - CxImageCrtDll - CxImageMfcDll - dome - domeDll - jasper - jbig - jpeg - libdcr - mng - png - tiff - zlib 1。首先我们要确定在程序中是希望静态链接还是…...