当前位置: 首页 > news >正文

Go微服务: 分布式之发送带有事务消息的示例

分布式之发送带有事务消息

  • 现在做一个RocketMQ的事务消息的 demo

1 )生产者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)// 自定义结构体,为了实现 NewTransactionProducer 第一个参数的接口
type MyListener struct{}func (hl MyListener) ExecuteLocalTransaction(*primitive.Message) primitive.LocalTransactionState {return primitive.CommitMessageState
}func (hl MyListener) CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState {return primitive.CommitMessageState
}func main() {// ------------ 1. 连接RocketMQ ------------------------mqAddr := "127.0.0.1:9876" // 模拟地址// NewTransactionProducer 这个方法第一个参数是一个 Listener// 是一个接口,需要一个接口体去实现它的方法p, err := rocketmq.NewTransactionProducer( // 开启事物消息生产者MyListener{},producer.WithNameServer([]string{mqAddr}),)if err != nil {panic(err) // 生产环境禁用panic}// ------------ 2. 启动RocketMQ ------------------------err = p.Start()if err != nil {panic(err)}// ------------ 3. 发送RocketMQ 消息 ------------------------res, err := p.SendMessageInTransaction(context.Background(),primitive.NewMessage("MyTransactionTopic", []byte("xxxxxxxxxxxyyyyyyyyyyyyyzzzzzzzzzzzz")),)fmt.Println(res.Status)if err != nil {panic(err)}fmt.Printf("发送成功")time.Sleep(time.Second * 3600)err = p.Shutdown()if err != nil {panic(err)}
}
  • 可见, primitive.LocalTransactionState 是返回值
  • 进入这个包中,它有三个状态
    • CommitMessageState 提交状态
    • RollbackMessageState 回滚状态
    • UnknowState 未知状态
  • 在后续,这三个状态都可以再试一试

2 ) 运行后,在UI界面查看消息


2.1 输出信息如下

INFO[0000] change the route for clients                 
INFO[0000] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-bbs\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-bbs\",\"brokerAddrs\":{\"0\":\"192.168.124.6:10911\"}}]}" changedFrom="<nil>" topic=MyTransactionTopic
0
发送成功

2.2 运行效果

2 )消费者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {mqAddr := "127.0.0.1:9876"topic := "MyTransactionTopic"groupName := "ddddddd"c, err := rocketmq.NewPushConsumer(consumer.WithGroupName(groupName),consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),)if err != nil {panic(err)}err = c.Subscribe(topic, consumer.MessageSelector{},func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgList {fmt.Printf("订阅消息,消费%v \n", msgList[i])}return consumer.ConsumeSuccess, nil})if err != nil {fmt.Println("消费消息错误: %v", err.Error())}err = c.Start()if err != nil {fmt.Println("开启消费这错误: %v", err.Error())}time.Sleep(time.Hour)err = c.Shutdown()if err != nil {fmt.Println("shutdown消费者错误: %v", err.Error())}
}
  • 在RocketMQ中,事务消息的处理机制涉及到生产者和消费者两端的协作,但与普通消息消费模式有所区别

  • 事务消息的消费端并不直接参与到事务的两阶段提交过程中,它更像是一个“半事务消息”的确认者

  • 具体流程如下:

    • 生产者发送事务消息:生产者发送一条半事务消息到MQ服务器,并立即返回,此时消息处于“Prepare”状态
    • MQ Server回调生产者确认:MQ服务器会回调生产者提供的事务监听器(在Go示例中是HappyListener),执行本地事务。生产者需在此阶段执行事务操作并决定是提交还是回滚该消息
    • 生产者根据本地事务结果告知MQ Server:生产者根据本地事务执行结果,通过事务状态检查接口告诉MQ服务器是提交还是回滚这条半事务消息。
    • 消息变为可消费状态:MQ服务器根据生产者的决定,将消息标记为Commit或Rollback,Commit后的消息才对普通消费者可见
  • 因此,对于事务消息的消费者来说,其主要职责是消费那些已经被事务提交成功的消息,而不需要直接参与事务的提交或回滚过程

  • 消费者代码看起来与普通消息的消费者相似,但消费的消息实际上是生产者已经提交成功的事务消息

  • 不过,如果您的需求是希望消费者也以某种形式参与到事务的最终确认中,比如基于消息的消费结果来决定是否提交事务,这在RocketMQ的标准事务消息模型中并不直接支持

  • RocketMQ的事务模型主要关注于保证消息生产和本地事务的原子性,消费者更多的是作为事务结果的后续处理者角色

  • 在提供的消费者示例中,尽管它看起来是一个普通的消费者,但实际上它处理的是生产者通过事务消息流程提交后的内容,这符合事务消息的消费逻辑

  • 如果需要在消费端实现更复杂的逻辑来间接响应事务状态,可能需要结合业务系统进行额外的设计,比如通过监听数据库状态变化、消息队列的死信队列特性或其他补偿机制来处理未决事务

延迟性事务消息

  • 您需要在创建消息时指定消息的延迟等级,而不是在生产者配置或消息发送后进行延迟
  • RocketMQ支持多种延迟等级,每种等级对应不同的延迟时间
  • 注意,事务消息和延迟消息的直接组合在RocketMQ中并不是直接支持的特性
  • 因为事务消息的设计主要是围绕两阶段提交模型,确保消息发送与本地事务的一致性
  • 而延迟消息侧重于消息的定时投递
  • 然而,可以通过间接的方式结合这两个特性,即在事务消息的本地事务逻辑中包含对延迟操作的处理
  • 下面的示例尝试模拟一种结合方式,但请注意,这仅是一种逻辑上的结合,实际应用中需要根据具体业务场景仔细设计和测试
  • 方案思路:
    • 生产者:发送一个事务消息到特定的主题(例如DelayedTransactionTopic),该消息体中携带了需要进行延迟处理的信息。
    • 事务监听器:在ExecuteLocalTransaction方法中,不直接执行长时间的延迟逻辑,而是执行快速操作(如记录消息待处理状态或存入DB),然后返回primitive.Prepared状态。
    • 检查事务状态:在CheckLocalTransaction方法中,检查事务状态,如果需要,触发一个异步任务或消息队列中的消息,该消息携带延迟处理逻辑和真正的延迟时间。
    • 延迟处理服务:这个服务从队列中取出消息并根据消息中的指示进行真正的延迟操作,例如通过内部队列或定时任务系统(如分布式定时任务框架)来实现延迟执行
  • 相关生产者伪代码未完全实现,仅供参考
    package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
    )type DelayedTxListener struct{}func (d DelayedTxListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {// 假设这里将消息标识为待处理,并记录相关信息到数据库fmt.Println("Preparing transaction, storing message meta...")return primitive.Prepared
    }func (d DelayedTxListener) CheckLocalTransaction(msgExt *primitive.MessageExt) primitive.LocalTransactionState {// 在这里检查消息是否准备好执行延迟操作// 实际操作可能包括从数据库查询该消息的状态// 假设我们已经确定需要进行延迟操作,这里直接模拟提交return primitive.CommitMessageState
    }func main() {mqAddr := "127.0.0.1:9876"p, err := rocketmq.NewTransactionProducer(DelayedTxListener{},producer.WithNameServer([]string{mqAddr}),)if err != nil {panic(err)}err = p.Start()if err != nil {panic(err)}msg := &primitive.Message{Topic: "DelayedTransactionTopic",Body:  []byte("消息内容,可以包含延迟处理的详细信息"),}res, err := p.SendMessageInTransaction(context.Background(), msg)if err != nil {panic(err)}fmt.Printf("发送事务消息状态: %v\n", res.Status)time.Sleep(time.Second * 3600)err = p.Shutdown()if err != nil {panic(err)}
    }
    
  • 注意事项
    • 这个示例主要是概念性的,展示了如何在事务消息的上下文中计划后续的延迟处理步骤,而没有直接实现延迟消息的发送。
    • 实际应用中,您可能需要实现一个额外的后台服务或消息队列来处理这些“计划”好的延迟任务,确保它们能在预定时间得到执行。
    • 事务消息的两阶段提交机制仍然适用,只是延迟操作本身不在RocketMQ的直接事务控制范围内,而是作为一种业务逻辑上的后续处理。
    • 务必根据您的具体业务需求和RocketMQ的版本特性,仔细设计和测试这样的解决方案。

相关文章:

Go微服务: 分布式之发送带有事务消息的示例

分布式之发送带有事务消息 现在做一个RocketMQ的事务消息的 demo 1 &#xff09;生产者 package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/prim…...

【go】go初始化命令总结

包初始化 test项目目录下执行 go mod init test go mod tidy生成二进制可执行文件 go build -o test .\main.go...

vue音乐播放条

先看效果 再看代码 <template><div class"footer-player z-30 flex items-center p-2"><div v-if"isShow" class"h-12 w-60 overflow-hidden"><div :style"activeStyle" class"open-detail-control-wrap&…...

halcon实现浓淡补正,中间值补正-抽取暗

代码效果 抽取前 中值抽取暗 halcon函数代码 测试图片参数 NoiseCut:16 Gain:1 输入ImagePart NoiseCut Gain *获取直方图 get_domain (ImagePart, Domain) gray_histo_range(Domain,ImagePart,0,255,256, Histo, BinSize) area_center(Domain, NumPixels, Row, Column) …...

太速科技-FMC213V3-基于FMC兼容1.8V IO的Full Camera Link 输入子卡

FMC213V3-基于FMC兼容1.8V IO的Full Camera Link 输入子卡 一、板卡概述 该板卡为了考虑兼容1.8V电平IO&#xff0c;适配Virtex7&#xff0c;Kintex Ultrascale&#xff0c;Virtex ultrasacle FPGA而特制&#xff0c;如果要兼容原来的3.3V 也可以修改硬件参数。板卡支持1路…...

GPU短缺和模型效率的推动

1. 引言 随着全球GPU短缺和云计算成本的不断上升&#xff0c;开发更高效的AI模型成为了当前的焦点。技术如低秩适应&#xff08;LoRA&#xff09;和量化&#xff08;Quantization&#xff09;在优化性能的同时&#xff0c;减少了资源需求。这些技术不仅在当前的AI开发中至关重…...

linux在文件夹中查找文件内容

linux在文件夹中查找文件内容 在Linux中,可以通过以下多个途径,在文件夹中查找文件内容: 1、使用grep命令: grep -r "要查找的内容" /path/to/folder-r参数表示递归地在文件夹及其子文件夹中搜索。/path/to/folder是要搜索的文件夹路径。2、使用ack命令 ack …...

算法:11. 盛最多水的容器

11. 盛最多水的容器 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你…...

Hazelcast 分布式缓存 在Seatunnel中的使用

1、背景 最近在调研seatunnel的时候&#xff0c;发现新版的seatunnel提供了一个web服务&#xff0c;可以用于图形化的创建数据同步任务&#xff0c;然后管理任务。这里面有个日志模块&#xff0c;可以查看任务的执行状态。其中有个取读数据条数和同步数据条数。很好奇这个数据…...

分数限制下,选好专业还是选好学校?

目录 分数限制下&#xff0c;选好专业还是选好学校&#xff1f; 方向一&#xff1a;专业解析 1. 专业选择的重要性 2. 不同专业的优势与挑战 3. 个人专业选择经验分享 4. 实际场景下的“专业VS学校”选择方案 方向二&#xff1a;名校效应分析 1. 名校声誉与品牌效应 2…...

软件改为开机自启动

1.按键 win R,输入“shell:startup”命令, 然后就可以打开启动目录了&#xff0c;如下&#xff1a; 2.然后&#xff0c;把要开机启动的程序的图标拖进去即可。 参考&#xff1a;开机启动项如何设置...

集群down机的应急和恢复测试(非重做备机)

1. 集群的两台服务器的状态 实例 正常情况主备 ip 端口 node1 主机 192.168.6.6 9088 node2 备机 192.168.6.7 9088 2. 测试的步骤 down掉node1观察node2的状态在node2未自动切换的时候手动将node2调整为单机状态&#xff0c;模拟紧急使用模拟不紧急时&#xff0…...

【数据库系统概论复习】关系数据库与关系代数笔记

文章目录 基本概念数据库基本概念关系数据结构完整性约束 关系代数关系代数练习课堂练习 语法树 基本概念 数据库基本概念 DB 数据库&#xff0c; 为了存用户的各种数据&#xff0c;我们要建很多关系&#xff08;二维表&#xff09;&#xff0c;所以把相关的关系&#xff08;二…...

赛氪网受邀参加上海闵行区翻译协会年会,共探科技翻译创新之路

在科技飞速发展的时代背景下&#xff0c;翻译行业正面临着前所未有的机遇与挑战。作为连接高校、企业与社会的桥梁&#xff0c;赛氪网在推动翻译创新、促进学术交流方面展现出了独特的魅力。2024年6月9日&#xff0c;在华东师范大学外语学院举办的第十三届上海市闵行区翻译协会…...

项目管理进阶之EVM(挣值管理)

前言 项目管理进阶系列&#xff0c;终于有时间更新啦&#xff01;&#xff01;&#xff01;欢迎持续关注哦~ 上一节博主重点讲了一个环&#xff1a;PDCA&#xff0c;无论各行各业&#xff0c;上到航空航天、下到种地种菜&#xff0c;都离不开对质量的监督和改进。这个环既是一…...

PLSQL、Oracle以及客户端远程连接服务器笔记(仅供参考)

1.PLSQL参考链接&#xff1a; 全网最全最细的PLSQL下载、安装、配置、使用指南、问题解答&#xff0c;相关问题已汇总-CSDN博客文章浏览阅读2.9w次&#xff0c;点赞98次&#xff0c;收藏447次。双击之后&#xff0c;这里选择安装目录&#xff0c;你安装目录选的哪里&#xff0…...

Win快速删除node_modules

在Windows系统上删除 node_modules 文件夹通常是一个缓慢且耗时的过程。这主要是由于几个关键因素导致的&#xff1a; 主要原因 文件数量多且嵌套深&#xff1a; node_modules 文件夹通常包含成千上万的子文件夹和文件。由于其结构复杂&#xff0c;文件和文件夹往往嵌套得非常…...

【机器学习】基于顺序到顺序Transformer机器翻译

引言 1.1 序列到序列模型详解 序列到序列(Seq2Seq)模型是深度学习中处理序列数据转换问题的关键架构。在自然语言处理(NLP)任务中&#xff0c;如机器翻译、文本摘要和聊天机器人等&#xff0c;Seq2Seq模型能够高效地将输入序列转换为期望的输出序列。 模型架构&#xff1a; 编…...

TEA 加密的 Java 实现

import java.nio.ByteBuffer; import java.nio.ByteOrder;public class TeaUtils {private static final int DELTA 0x9E3779B9;private static final int ROUND 32;private static final String KEY "password";/*** 加密字符串&#xff0c;使用 TEA 加密算法*/p…...

鸿蒙开发电话服务:【@ohos.telephony.data (蜂窝数据)】

蜂窝数据 说明&#xff1a; 本模块首批接口从API version 7开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 import data from ohos.telephony.data;data.getDefaultCellularDataSlotId getDefaultCellularDataSlotId(callback: Async…...

7.4.分块查找

一.分块查找的算法思想&#xff1a; 1.实例&#xff1a; 以上述图片的顺序表为例&#xff0c; 该顺序表的数据元素从整体来看是乱序的&#xff0c;但如果把这些数据元素分成一块一块的小区间&#xff0c; 第一个区间[0,1]索引上的数据元素都是小于等于10的&#xff0c; 第二…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止

<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet&#xff1a; https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

服务器硬防的应用场景都有哪些?

服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式&#xff0c;避免服务器受到各种恶意攻击和网络威胁&#xff0c;那么&#xff0c;服务器硬防通常都会应用在哪些场景当中呢&#xff1f; 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...

css的定位(position)详解:相对定位 绝对定位 固定定位

在 CSS 中&#xff0c;元素的定位通过 position 属性控制&#xff0c;共有 5 种定位模式&#xff1a;static&#xff08;静态定位&#xff09;、relative&#xff08;相对定位&#xff09;、absolute&#xff08;绝对定位&#xff09;、fixed&#xff08;固定定位&#xff09;和…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)

上一章用到了V2 的概念&#xff0c;其实 Fiori当中还有 V4&#xff0c;咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务)&#xff0c;代理中间件&#xff08;ui5-middleware-simpleproxy&#xff09;-CSDN博客…...

AI,如何重构理解、匹配与决策?

AI 时代&#xff0c;我们如何理解消费&#xff1f; 作者&#xff5c;王彬 封面&#xff5c;Unplash 人们通过信息理解世界。 曾几何时&#xff0c;PC 与移动互联网重塑了人们的购物路径&#xff1a;信息变得唾手可得&#xff0c;商品决策变得高度依赖内容。 但 AI 时代的来…...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

Java求职者面试指南:计算机基础与源码原理深度解析

Java求职者面试指南&#xff1a;计算机基础与源码原理深度解析 第一轮提问&#xff1a;基础概念问题 1. 请解释什么是进程和线程的区别&#xff1f; 面试官&#xff1a;进程是程序的一次执行过程&#xff0c;是系统进行资源分配和调度的基本单位&#xff1b;而线程是进程中的…...