当前位置: 首页 > news >正文

Go微服务: 分布式之发送带有事务消息的示例

分布式之发送带有事务消息

  • 现在做一个RocketMQ的事务消息的 demo

1 )生产者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)// 自定义结构体,为了实现 NewTransactionProducer 第一个参数的接口
type MyListener struct{}func (hl MyListener) ExecuteLocalTransaction(*primitive.Message) primitive.LocalTransactionState {return primitive.CommitMessageState
}func (hl MyListener) CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState {return primitive.CommitMessageState
}func main() {// ------------ 1. 连接RocketMQ ------------------------mqAddr := "127.0.0.1:9876" // 模拟地址// NewTransactionProducer 这个方法第一个参数是一个 Listener// 是一个接口,需要一个接口体去实现它的方法p, err := rocketmq.NewTransactionProducer( // 开启事物消息生产者MyListener{},producer.WithNameServer([]string{mqAddr}),)if err != nil {panic(err) // 生产环境禁用panic}// ------------ 2. 启动RocketMQ ------------------------err = p.Start()if err != nil {panic(err)}// ------------ 3. 发送RocketMQ 消息 ------------------------res, err := p.SendMessageInTransaction(context.Background(),primitive.NewMessage("MyTransactionTopic", []byte("xxxxxxxxxxxyyyyyyyyyyyyyzzzzzzzzzzzz")),)fmt.Println(res.Status)if err != nil {panic(err)}fmt.Printf("发送成功")time.Sleep(time.Second * 3600)err = p.Shutdown()if err != nil {panic(err)}
}
  • 可见, primitive.LocalTransactionState 是返回值
  • 进入这个包中,它有三个状态
    • CommitMessageState 提交状态
    • RollbackMessageState 回滚状态
    • UnknowState 未知状态
  • 在后续,这三个状态都可以再试一试

2 ) 运行后,在UI界面查看消息


2.1 输出信息如下

INFO[0000] change the route for clients                 
INFO[0000] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-bbs\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-bbs\",\"brokerAddrs\":{\"0\":\"192.168.124.6:10911\"}}]}" changedFrom="<nil>" topic=MyTransactionTopic
0
发送成功

2.2 运行效果

2 )消费者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {mqAddr := "127.0.0.1:9876"topic := "MyTransactionTopic"groupName := "ddddddd"c, err := rocketmq.NewPushConsumer(consumer.WithGroupName(groupName),consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),)if err != nil {panic(err)}err = c.Subscribe(topic, consumer.MessageSelector{},func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgList {fmt.Printf("订阅消息,消费%v \n", msgList[i])}return consumer.ConsumeSuccess, nil})if err != nil {fmt.Println("消费消息错误: %v", err.Error())}err = c.Start()if err != nil {fmt.Println("开启消费这错误: %v", err.Error())}time.Sleep(time.Hour)err = c.Shutdown()if err != nil {fmt.Println("shutdown消费者错误: %v", err.Error())}
}
  • 在RocketMQ中,事务消息的处理机制涉及到生产者和消费者两端的协作,但与普通消息消费模式有所区别

  • 事务消息的消费端并不直接参与到事务的两阶段提交过程中,它更像是一个“半事务消息”的确认者

  • 具体流程如下:

    • 生产者发送事务消息:生产者发送一条半事务消息到MQ服务器,并立即返回,此时消息处于“Prepare”状态
    • MQ Server回调生产者确认:MQ服务器会回调生产者提供的事务监听器(在Go示例中是HappyListener),执行本地事务。生产者需在此阶段执行事务操作并决定是提交还是回滚该消息
    • 生产者根据本地事务结果告知MQ Server:生产者根据本地事务执行结果,通过事务状态检查接口告诉MQ服务器是提交还是回滚这条半事务消息。
    • 消息变为可消费状态:MQ服务器根据生产者的决定,将消息标记为Commit或Rollback,Commit后的消息才对普通消费者可见
  • 因此,对于事务消息的消费者来说,其主要职责是消费那些已经被事务提交成功的消息,而不需要直接参与事务的提交或回滚过程

  • 消费者代码看起来与普通消息的消费者相似,但消费的消息实际上是生产者已经提交成功的事务消息

  • 不过,如果您的需求是希望消费者也以某种形式参与到事务的最终确认中,比如基于消息的消费结果来决定是否提交事务,这在RocketMQ的标准事务消息模型中并不直接支持

  • RocketMQ的事务模型主要关注于保证消息生产和本地事务的原子性,消费者更多的是作为事务结果的后续处理者角色

  • 在提供的消费者示例中,尽管它看起来是一个普通的消费者,但实际上它处理的是生产者通过事务消息流程提交后的内容,这符合事务消息的消费逻辑

  • 如果需要在消费端实现更复杂的逻辑来间接响应事务状态,可能需要结合业务系统进行额外的设计,比如通过监听数据库状态变化、消息队列的死信队列特性或其他补偿机制来处理未决事务

延迟性事务消息

  • 您需要在创建消息时指定消息的延迟等级,而不是在生产者配置或消息发送后进行延迟
  • RocketMQ支持多种延迟等级,每种等级对应不同的延迟时间
  • 注意,事务消息和延迟消息的直接组合在RocketMQ中并不是直接支持的特性
  • 因为事务消息的设计主要是围绕两阶段提交模型,确保消息发送与本地事务的一致性
  • 而延迟消息侧重于消息的定时投递
  • 然而,可以通过间接的方式结合这两个特性,即在事务消息的本地事务逻辑中包含对延迟操作的处理
  • 下面的示例尝试模拟一种结合方式,但请注意,这仅是一种逻辑上的结合,实际应用中需要根据具体业务场景仔细设计和测试
  • 方案思路:
    • 生产者:发送一个事务消息到特定的主题(例如DelayedTransactionTopic),该消息体中携带了需要进行延迟处理的信息。
    • 事务监听器:在ExecuteLocalTransaction方法中,不直接执行长时间的延迟逻辑,而是执行快速操作(如记录消息待处理状态或存入DB),然后返回primitive.Prepared状态。
    • 检查事务状态:在CheckLocalTransaction方法中,检查事务状态,如果需要,触发一个异步任务或消息队列中的消息,该消息携带延迟处理逻辑和真正的延迟时间。
    • 延迟处理服务:这个服务从队列中取出消息并根据消息中的指示进行真正的延迟操作,例如通过内部队列或定时任务系统(如分布式定时任务框架)来实现延迟执行
  • 相关生产者伪代码未完全实现,仅供参考
    package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
    )type DelayedTxListener struct{}func (d DelayedTxListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {// 假设这里将消息标识为待处理,并记录相关信息到数据库fmt.Println("Preparing transaction, storing message meta...")return primitive.Prepared
    }func (d DelayedTxListener) CheckLocalTransaction(msgExt *primitive.MessageExt) primitive.LocalTransactionState {// 在这里检查消息是否准备好执行延迟操作// 实际操作可能包括从数据库查询该消息的状态// 假设我们已经确定需要进行延迟操作,这里直接模拟提交return primitive.CommitMessageState
    }func main() {mqAddr := "127.0.0.1:9876"p, err := rocketmq.NewTransactionProducer(DelayedTxListener{},producer.WithNameServer([]string{mqAddr}),)if err != nil {panic(err)}err = p.Start()if err != nil {panic(err)}msg := &primitive.Message{Topic: "DelayedTransactionTopic",Body:  []byte("消息内容,可以包含延迟处理的详细信息"),}res, err := p.SendMessageInTransaction(context.Background(), msg)if err != nil {panic(err)}fmt.Printf("发送事务消息状态: %v\n", res.Status)time.Sleep(time.Second * 3600)err = p.Shutdown()if err != nil {panic(err)}
    }
    
  • 注意事项
    • 这个示例主要是概念性的,展示了如何在事务消息的上下文中计划后续的延迟处理步骤,而没有直接实现延迟消息的发送。
    • 实际应用中,您可能需要实现一个额外的后台服务或消息队列来处理这些“计划”好的延迟任务,确保它们能在预定时间得到执行。
    • 事务消息的两阶段提交机制仍然适用,只是延迟操作本身不在RocketMQ的直接事务控制范围内,而是作为一种业务逻辑上的后续处理。
    • 务必根据您的具体业务需求和RocketMQ的版本特性,仔细设计和测试这样的解决方案。

相关文章:

Go微服务: 分布式之发送带有事务消息的示例

分布式之发送带有事务消息 现在做一个RocketMQ的事务消息的 demo 1 &#xff09;生产者 package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/prim…...

【go】go初始化命令总结

包初始化 test项目目录下执行 go mod init test go mod tidy生成二进制可执行文件 go build -o test .\main.go...

vue音乐播放条

先看效果 再看代码 <template><div class"footer-player z-30 flex items-center p-2"><div v-if"isShow" class"h-12 w-60 overflow-hidden"><div :style"activeStyle" class"open-detail-control-wrap&…...

halcon实现浓淡补正,中间值补正-抽取暗

代码效果 抽取前 中值抽取暗 halcon函数代码 测试图片参数 NoiseCut:16 Gain:1 输入ImagePart NoiseCut Gain *获取直方图 get_domain (ImagePart, Domain) gray_histo_range(Domain,ImagePart,0,255,256, Histo, BinSize) area_center(Domain, NumPixels, Row, Column) …...

太速科技-FMC213V3-基于FMC兼容1.8V IO的Full Camera Link 输入子卡

FMC213V3-基于FMC兼容1.8V IO的Full Camera Link 输入子卡 一、板卡概述 该板卡为了考虑兼容1.8V电平IO&#xff0c;适配Virtex7&#xff0c;Kintex Ultrascale&#xff0c;Virtex ultrasacle FPGA而特制&#xff0c;如果要兼容原来的3.3V 也可以修改硬件参数。板卡支持1路…...

GPU短缺和模型效率的推动

1. 引言 随着全球GPU短缺和云计算成本的不断上升&#xff0c;开发更高效的AI模型成为了当前的焦点。技术如低秩适应&#xff08;LoRA&#xff09;和量化&#xff08;Quantization&#xff09;在优化性能的同时&#xff0c;减少了资源需求。这些技术不仅在当前的AI开发中至关重…...

linux在文件夹中查找文件内容

linux在文件夹中查找文件内容 在Linux中,可以通过以下多个途径,在文件夹中查找文件内容: 1、使用grep命令: grep -r "要查找的内容" /path/to/folder-r参数表示递归地在文件夹及其子文件夹中搜索。/path/to/folder是要搜索的文件夹路径。2、使用ack命令 ack …...

算法:11. 盛最多水的容器

11. 盛最多水的容器 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你…...

Hazelcast 分布式缓存 在Seatunnel中的使用

1、背景 最近在调研seatunnel的时候&#xff0c;发现新版的seatunnel提供了一个web服务&#xff0c;可以用于图形化的创建数据同步任务&#xff0c;然后管理任务。这里面有个日志模块&#xff0c;可以查看任务的执行状态。其中有个取读数据条数和同步数据条数。很好奇这个数据…...

分数限制下,选好专业还是选好学校?

目录 分数限制下&#xff0c;选好专业还是选好学校&#xff1f; 方向一&#xff1a;专业解析 1. 专业选择的重要性 2. 不同专业的优势与挑战 3. 个人专业选择经验分享 4. 实际场景下的“专业VS学校”选择方案 方向二&#xff1a;名校效应分析 1. 名校声誉与品牌效应 2…...

软件改为开机自启动

1.按键 win R,输入“shell:startup”命令, 然后就可以打开启动目录了&#xff0c;如下&#xff1a; 2.然后&#xff0c;把要开机启动的程序的图标拖进去即可。 参考&#xff1a;开机启动项如何设置...

集群down机的应急和恢复测试(非重做备机)

1. 集群的两台服务器的状态 实例 正常情况主备 ip 端口 node1 主机 192.168.6.6 9088 node2 备机 192.168.6.7 9088 2. 测试的步骤 down掉node1观察node2的状态在node2未自动切换的时候手动将node2调整为单机状态&#xff0c;模拟紧急使用模拟不紧急时&#xff0…...

【数据库系统概论复习】关系数据库与关系代数笔记

文章目录 基本概念数据库基本概念关系数据结构完整性约束 关系代数关系代数练习课堂练习 语法树 基本概念 数据库基本概念 DB 数据库&#xff0c; 为了存用户的各种数据&#xff0c;我们要建很多关系&#xff08;二维表&#xff09;&#xff0c;所以把相关的关系&#xff08;二…...

赛氪网受邀参加上海闵行区翻译协会年会,共探科技翻译创新之路

在科技飞速发展的时代背景下&#xff0c;翻译行业正面临着前所未有的机遇与挑战。作为连接高校、企业与社会的桥梁&#xff0c;赛氪网在推动翻译创新、促进学术交流方面展现出了独特的魅力。2024年6月9日&#xff0c;在华东师范大学外语学院举办的第十三届上海市闵行区翻译协会…...

项目管理进阶之EVM(挣值管理)

前言 项目管理进阶系列&#xff0c;终于有时间更新啦&#xff01;&#xff01;&#xff01;欢迎持续关注哦~ 上一节博主重点讲了一个环&#xff1a;PDCA&#xff0c;无论各行各业&#xff0c;上到航空航天、下到种地种菜&#xff0c;都离不开对质量的监督和改进。这个环既是一…...

PLSQL、Oracle以及客户端远程连接服务器笔记(仅供参考)

1.PLSQL参考链接&#xff1a; 全网最全最细的PLSQL下载、安装、配置、使用指南、问题解答&#xff0c;相关问题已汇总-CSDN博客文章浏览阅读2.9w次&#xff0c;点赞98次&#xff0c;收藏447次。双击之后&#xff0c;这里选择安装目录&#xff0c;你安装目录选的哪里&#xff0…...

Win快速删除node_modules

在Windows系统上删除 node_modules 文件夹通常是一个缓慢且耗时的过程。这主要是由于几个关键因素导致的&#xff1a; 主要原因 文件数量多且嵌套深&#xff1a; node_modules 文件夹通常包含成千上万的子文件夹和文件。由于其结构复杂&#xff0c;文件和文件夹往往嵌套得非常…...

【机器学习】基于顺序到顺序Transformer机器翻译

引言 1.1 序列到序列模型详解 序列到序列(Seq2Seq)模型是深度学习中处理序列数据转换问题的关键架构。在自然语言处理(NLP)任务中&#xff0c;如机器翻译、文本摘要和聊天机器人等&#xff0c;Seq2Seq模型能够高效地将输入序列转换为期望的输出序列。 模型架构&#xff1a; 编…...

TEA 加密的 Java 实现

import java.nio.ByteBuffer; import java.nio.ByteOrder;public class TeaUtils {private static final int DELTA 0x9E3779B9;private static final int ROUND 32;private static final String KEY "password";/*** 加密字符串&#xff0c;使用 TEA 加密算法*/p…...

鸿蒙开发电话服务:【@ohos.telephony.data (蜂窝数据)】

蜂窝数据 说明&#xff1a; 本模块首批接口从API version 7开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 import data from ohos.telephony.data;data.getDefaultCellularDataSlotId getDefaultCellularDataSlotId(callback: Async…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

线程与协程

1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指&#xff1a;像函数调用/返回一样轻量地完成任务切换。 举例说明&#xff1a; 当你在程序中写一个函数调用&#xff1a; funcA() 然后 funcA 执行完后返回&…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装&#xff1b;只需暴露 19530&#xff08;gRPC&#xff09;与 9091&#xff08;HTTP/WebUI&#xff09;两个端口&#xff0c;即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

HTML前端开发:JavaScript 常用事件详解

作为前端开发的核心&#xff0c;JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例&#xff1a; 1. onclick - 点击事件 当元素被单击时触发&#xff08;左键点击&#xff09; button.onclick function() {alert("按钮被点击了&#xff01;&…...

Typeerror: cannot read properties of undefined (reading ‘XXX‘)

最近需要在离线机器上运行软件&#xff0c;所以得把软件用docker打包起来&#xff0c;大部分功能都没问题&#xff0c;出了一个奇怪的事情。同样的代码&#xff0c;在本机上用vscode可以运行起来&#xff0c;但是打包之后在docker里出现了问题。使用的是dialog组件&#xff0c;…...

Golang——7、包与接口详解

包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...

Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案

在大数据时代&#xff0c;海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构&#xff0c;在处理大规模数据抓取任务时展现出强大的能力。然而&#xff0c;随着业务规模的不断扩大和数据抓取需求的日益复杂&#xff0c;传统…...

go 里面的指针

指针 在 Go 中&#xff0c;指针&#xff08;pointer&#xff09;是一个变量的内存地址&#xff0c;就像 C 语言那样&#xff1a; a : 10 p : &a // p 是一个指向 a 的指针 fmt.Println(*p) // 输出 10&#xff0c;通过指针解引用• &a 表示获取变量 a 的地址 p 表示…...

MySQL的pymysql操作

本章是MySQL的最后一章&#xff0c;MySQL到此完结&#xff0c;下一站Hadoop&#xff01;&#xff01;&#xff01; 这章很简单&#xff0c;完整代码在最后&#xff0c;详细讲解之前python课程里面也有&#xff0c;感兴趣的可以往前找一下 一、查询操作 我们需要打开pycharm …...

面试高频问题

文章目录 &#x1f680; 消息队列核心技术揭秘&#xff1a;从入门到秒杀面试官1️⃣ Kafka为何能"吞云吐雾"&#xff1f;性能背后的秘密1.1 顺序写入与零拷贝&#xff1a;性能的双引擎1.2 分区并行&#xff1a;数据的"八车道高速公路"1.3 页缓存与批量处理…...