RocketMQ基础篇(一)
目录
- 一、发送消息类型
- 1、同步消息
- 2、异步消息
- 3、单向消息
- 4、顺序消费
- 5、延迟消费
- 二、消费模式
- 1、集群模式
- 2、广播模式
- 3、消费模式扩展
- 4、如何配置
- 三、其他用法
- 1、事务消息
- 2、过滤消息
- 1)Tag过滤
- 2)SQL方式过滤
源码放到了GitHub仓库上,地址 https://github.com/shengwanping/SpringBoot-RocketMQ/tree/dev_01
一、发送消息类型
1、同步消息
发送同步消息是指producer向 broker发送消息,执行API时同步等待,直到broker服务器返回发送结果
// 可以使用RocketMQTemplate类下面的syncSend方法
SendResult sendResult = rocketMQTemplate.syncSend("topic_001", "Hello RocketMQ 同步消息");
System.out.println(sendResult);
2、异步消息
指producer向broker发送消息时异步执行,不会影响后面逻辑。而异步里面会调用一个回调方法,来处理消息发送成功或失败的逻辑
// 可以使用RocketMQTemplate类下面的asyncSend方法
rocketMQTemplate.asyncSend("topic_001", "Hello RocketMQ 异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步消息 发送成功!");}@Overridepublic void onException(Throwable throwable) {System.out.println("异步消息 发送失败!");}});
3、单向消息
是指producer向 broker发送消息,执行API时直接返回,不等待broker 服务器的响应
// 可以使用RocketMQTemplate类下面的sendOneWay方法
rocketMQTemplate.sendOneWay("topic_001", "Hello RocketMQ 单项消息");
4、顺序消费
就是让消费者按照生产者发送消息的顺序去消费。
应用场景:比如电商系统需要实现,订单创建、支付、完成顺序的流程。RocketMQ默认是并发消费,没有顺序的。需要顺序消费需要通过如下配置:
首先消费者@RocketMQMessageListener注解,consumeMode
设置为ConsumeMode.ORDERLY
@RocketMQMessageListener(topic = "topic_001",consumerGroup = "${rocketmq.consumer.group}",consumeMode = ConsumeMode.ORDERLY)
然后生产者调用含有Orderly的方法:
// topic 消息 队列rocketMQTemplate.syncSendOrderly("topic_001", "1001顺序消费-创建", "1001");rocketMQTemplate.syncSendOrderly("topic_001", "1001顺序消费-支付", "1001");rocketMQTemplate.syncSendOrderly("topic_001", "1001顺序消费-完成", "1001");rocketMQTemplate.syncSendOrderly("topic_001", "1002顺序消费-创建", "1002");rocketMQTemplate.syncSendOrderly("topic_001", "1002顺序消费-支付", "1002");rocketMQTemplate.syncSendOrderly("topic_001", "1002顺序消费-完成", "1002");
在消费者打印结果:
1001顺序消费-创建
1001顺序消费-消费
1001顺序消费-完成
1002顺序消费-创建
1002顺序消费-消费
1002顺序消费-完成
5、延迟消费
就是生产者设定延迟时间,时间到了消费者才能去消费
应用场景:一种比较常见的场要就是在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后customer会收到这条订单满息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式去处理了,
RocketMQ不能自定义延迟时间,有特定等级如下
延迟等级 0 不延迟,1 延时1s,2 延时5s,3 延时10s,4 延时 30s,以此类推。。。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 延迟方法 topic 消息 默认3秒,没有发送消息会抛出异常 延迟等级
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel)
// 演示
rocketMQTemplate.syncSend("topic_001", MessageBuilder.withPayload("延迟消费5秒").build(), 3000, 2);
rocketMQTemplate.syncSend("topic_001", MessageBuilder.withPayload("延迟消费30秒").build(), 3000, 4);
二、消费模式
RocketMQ的消费者有两种消费模式:BROADCASTING
广播模式,CLUSTERING
集群模式,默认集群消费模式。
1、集群模式
理解:
如果这个消费者组都是集群模式,那么这个消费者组会去平分这个topic下面的消息,且一条消息只能被一个消费者消费
。
举个例子:
生产者给topic_1发送了10条消息,消费topic_1的这个消费者组有2个消费者,那么这两个消费者就会平分这10条消息,每个消费者5条消息。
但是经测试有时也会一个消费者6条,另一个消费者4条。(这个问题笔者占时也不清楚,待解答)
2、广播模式
理解:
如果这个消费者组都是广播模式,那么这个消费者组中的每个消费者都会去执行这个topic下面所有的消息,相当于一条消息会被执行多次
。
举个例子:
生产者给topic_2发送了10条消息,消费topic_2的这个消费者组有2个消费者,那么这两个消费者都会去消费这10条消息
3、消费模式扩展
生产者给Topic_1推送了10条消息,然后同时有两个消费者组对Topic_1进行消费。
消费者组1 中有两个消费者,分别是消费者A和消费者B,消费者A是集群模式,消费者B是广播模式。
消费者组2 中有一个消费者,是消费者C,消费者C是广播模式。
这个时候是怎么消费的呢?
答案是:消费者C会全量消费10条消息,消费者B也会全量消费10条消息,而消费者A只会消费一半消息(可能4条、5条、6条)
解释:由上可以看出,消费多少消息是
由消费者的消费模式决定的
。因为B、C都是广播模式,所以会消费这个Topic下面所有消息,而A是集群模式,他只会消费到的消息是
消费消息数量 = Topic中消息总数/消费者组中消费者数量
4、如何配置
在消费者 @RocketMQMessageListener
注解中配置messageModel
参数,(没有设置默认集群模式)
设置为MessageModel.CLUSTERING
,则是集群模式
设置为MessageModel.BROADCASTING
,则是广播模式
如下:
@RocketMQMessageListener(topic = "topic_001",consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.CLUSTERING)
三、其他用法
1、事务消息
Half(Prepare) Message——半消息(预处理消息)
半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。
Message Status Check——消息状态回查
由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit或Rollback)。可以看出, Message Status Check主要用来解决分布式事务中的超时问题。
1.应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ。
2. prepare消息发送成功后,应用模块执行数据库事务(本地事务)。
3. 根据数据库事务执行的结果,再返回Commit或Rollback给MQ。
4. 如果是Commit, MQ把消息下发给Consumer端,如果是Rollback,直接删掉prepare消息。
5. 第3步的执行结果如果没响应,或是超时的,启动定时任务回查事务状态(最多重试15次,超过了默认丢弃此消息) ,处理结果同第4步。
6. MQ消费的成功机制由MQ自己保证。
生产者发送事务消息:
rocketMQTemplate.sendMessageInTransaction("topic_001", MessageBuilder.withPayload("事务消息").build(), null);
// rocketmq事务消息 配置类
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行本地事务,如果是COMMIT则消息发送成功,如果是ROLLBACK则直接丢弃消息,如果是UNKNOWN则调用checkLocalTransaction()try {System.out.println("executeLocalTransaction");}catch (Exception e){e.printStackTrace();return RocketMQLocalTransactionState.UNKNOWN;}return RocketMQLocalTransactionState.COMMIT;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {// 检查本地事务(最多调用15次,如果全部失败则ROLLBACK丢弃消息)System.out.println("checkLocalTransaction");return RocketMQLocalTransactionState.COMMIT;}
}
2、过滤消息
在消费端进行消息消费的时候,我们根据业务需求,可以对消息进行过滤处理需要的消息
尤其是广播模式下,消息过滤经常使用
RocketMQ提供了TAG和SQL表达式两种消息过滤方式
1)Tag过滤
生产者需要在Topic后面加上 冒号 + TAG
消费者需要配置 selectorType = SelectorType.TAG
和 selectorExpression
消费端配置如下:
@Component
// 指定topic 和 消费者组
@RocketMQMessageListener(topic = "topic_001",consumerGroup = "${rocketmq.consumer.group}",selectorType = SelectorType.TAG, selectorExpression = "TAG1 || TAG2")
public class ConsumerMode implements RocketMQListener<String> { // 继承RocketMQListener接口@Overridepublic void onMessage(String s) {System.out.println("收到的消息是:"+s);}
}
生产端发送消息如下:
rocketMQTemplate.convertAndSend("topic_001"+":"+"TAG1", MessageBuilder.withPayload("TAG1消息").build());rocketMQTemplate.convertAndSend("topic_001"+":"+"TAG2", MessageBuilder.withPayload("TAG2消息").build());rocketMQTemplate.convertAndSend("topic_001"+":"+"TAG3", MessageBuilder.withPayload("TAG3消息").build());
消费端打印:
收到的消息是:{"payload":"TAG1消息","headers":{"id":"3df5f1a5-cbb2-fac5-e95b-489f29bc4a77","timestamp":1678204919985}}
收到的消息是:{"payload":"TAG2消息","headers":{"id":"1112d1cf-e1a9-bc2c-b38d-59a667196385","timestamp":1678204920260}}
2)SQL方式过滤
SQL表达式方式可以根据发送消息时输入的属性进行一些计算。
RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。
数字比较,如>,>=, <,<=,
BETWEEN, =;
字符比较,如:=,<>,IN;
IS NULL or IS NOT NULL;
逻辑运算符:AND, OR, NOT;
常量类型:
数值,如:123,3.1415;
字符,如: ‘abc’,必须使用单引号;
NULL,特殊常量
Boolean, TRUE or FALSE;
首先要在broker配置文件里面加入支持,否则会报错
1、rocketmq-4.4.0\conf\broker.conf 加入enablePropertyFilter = true
2、重启broker 并指定 配置文件 mqbroker.cmd -n localhost:9876 autoCreateToopicEnable=true -c ../conf/broker.conf
完成上面两步操作接口用sql方式过滤消息
生产者:
Message msg1 = MessageBuilder.withPayload("rocketmq过滤消息测试01").build();Map<String, Object> headers = new HashMap<>();headers.put("name", "xiao ming");headers.put("a", 2) ;rocketMQTemplate.convertAndSend("topic_001", msg1, headers);Message msg2 = MessageBuilder.withPayload("rocketmq过滤消息测试02").build();Map<String, Object> headers1 = new HashMap<>();headers1.put("name", "xiao hua");headers1.put("a", 7) ;rocketMQTemplate.convertAndSend("topic_001", msg2, headers1);
消费者: 主要是selectorType = SelectorType.SQL92, selectorExpression = "name = 'xiao ming' and a < 5")
@Component
// 指定topic 和 消费者组
@RocketMQMessageListener(topic = "topic_001",consumerGroup = "${rocketmq.consumer.group}",
// selectorType = SelectorType.TAG, selectorExpression = "TAG1 || TAG2")selectorType = SelectorType.SQL92, selectorExpression = "name = 'xiao ming' and a < 5")
public class ConsumerMode implements RocketMQListener<String> { // 继承RocketMQListener接口@Overridepublic void onMessage(String s) {System.out.println("收到的消息是:"+s);}
}
消费者打印:
根据过滤条件只打印了第一条消息
收到的消息是:{"payload":"rocketmq过滤消息测试01","headers":{"id":"da3ac866-4140-b440-2f4c-ecb5ce4d9965","timestamp":1678206096192}}
相关文章:

RocketMQ基础篇(一)
目录一、发送消息类型1、同步消息2、异步消息3、单向消息4、顺序消费5、延迟消费二、消费模式1、集群模式2、广播模式3、消费模式扩展4、如何配置三、其他用法1、事务消息2、过滤消息1)Tag过滤2)SQL方式过滤源码放到了GitHub仓库上,地址 http…...

Android前沿技术—gradle中的build script详解
build.gradle是gradle中非常重要的一个文件,因为它描述了gradle中可以运行的任务,今天本文将会带大家体验一下如何创建一个build.gradle文件和如何编写其中的内容。 project和task gradle是一个构建工具,所谓构建工具就是通过既定的各种规则…...

深入浅出PaddlePaddle函数——paddle.zeros_like
分类目录:《深入浅出PaddlePaddle函数》总目录 相关文章: 深入浅出PaddlePaddle函数——paddle.Tensor 深入浅出PaddlePaddle函数——paddle.ones 深入浅出PaddlePaddle函数——paddle.zeros 深入浅出PaddlePaddle函数——paddle.full 深入浅出Padd…...

物料-零部件分类属性
离散制造业的研发、生产跟产品零部件紧密联系在一起,从企业业务流程来说零部件涉及研发、采购、仓储、生产、质量、售后和配件等多个部门,为了更好地管理零部件,下面我们一起来看看零部件概念及分类。 1、按行业属性分类 (1&…...

TypeError: cannot pickle ‘module‘ object
创建python对象时报错: TypeError: cannot pickle module object 原因: 很大可能是类成员错误的使用了第三方包(别名)等,具体排查方法可参考: import redisimport pickle from pprint import pformat as …...

[MySQL索引]3.索引的底层原理(二)
索引的底层原理(二)InnoDB的主键和二级/辅助索引树(涉及回表)MyISAM存储引擎的主键和二级索引树InnoDB的主键和二级/辅助索引树(涉及回表) 看下面这张student数据库表: 场景一:uid…...

JavaScript混淆——逆向思维的艺术
在本文中我们将介绍三种常见的JavaScript混淆技术。 1.混合名称 通过将函数名称和变量名混合使用,我们可以使代码更难读。下面是一个使用名称混合的JavaScript函数。 function c(a){var b[2,4,8,a],db[0]b[1]b[2]b[3],ed""a;return e}混合名称技术通过…...

数据库管理-第六十期 监听(20230309)
数据库管理 2023-03-09第六十期期 监听1 无法访问2 监听配置3 问题复现与解决4 静态监听5 记不住配置咋整总结第六十期期 监听 不知不觉又来到了一个整10期数,我承认上一期有很大的划水的。。。嫌疑吧,本期内容是从帮群友解决ADG前置配置时候的一个问题…...

概率论与数理统计相关知识
本博客为《概率论与数理统计--茆诗松(第二版)》阅读笔记,目的是查漏补缺前置知识数学符号连乘符号:;总和符号:;“任意”符号:∀;“存在”符号&…...

SOC计算方法:卡尔曼滤波算法
卡尔曼滤波算法是一种经典的状态估计算法,它广泛应用于控制领域和信号处理领域。在电动汽车领域中,卡尔曼滤波算法也被广泛应用于电池管理系统中的电池状态估计。其中,电池的状态包括电池的剩余容量(SOC)、内阻、温度等…...

【C语言】自定义类型、枚举类型与宏定义
目录一、自定义类型二、宏定义三、枚举类型一、自定义类型 自定义类型关键字:typedef,用新的类型名称代替原有的类型名。 例如: typedef char u8; u8 x;表示指定u8为新的类型名,代替char,作用与char相同,…...

Java进阶(下篇2)
Java进阶(下篇2)一、IO流01.File类的使用1.1、File类的实例化1.2、File类的常用方法11.3、File类的常用方法21.4、课后练习02、IO流原理及流的分类2.1、IO流原理2.2、流的分类2.3、IO 流体系03、节点流(或文件流)3.1、FileReader读入数据的基本操作3.2、…...

03单链表
、# 单链表 单链表是一种链式存储的数据结构,用一组地址任意的存储单元存放线性表中的数据元素。单链表中的每个结点包含一个数据域和一个指针域,数据域存放数据元素,指针域存放下一个结点的地址。单链表的第一个结点称为头结点,…...

ESLint、Prettier插件的安装与使用
在统一代码风格这一块,通常大家都会用到ESLint。虽然 ESLint 本身具备自动格式化代码的功能,但ESLint 的主要优势在于代码的风格检查并给出提示,而在代码格式化这一块 Prettier 做的更加专业,因此在实际项目开发中我们经常将 ESLi…...

matlab在管理学中的应用简matlab基础【三】
规划论及MATLAB计算 1、线性规划 问题的提出 例1. 某工厂在计划期内要安排甲、乙两种产品的生产,已知生产单位产品所需的资源A、B、C的消耗以及资源的计划期供给量,如下表: 问题:工厂应分别生产多少单位甲、乙产品才能使工厂获…...

NDK JNI 变声器实现
Android NDK 导入 C库的开发流程学习;通过使用fmod的C库,实现变声器功能。导入库文件1)复制fmod的C库到cpp目录下2)复制fmod的so库到jniLibs目录下3)复制fmod的jar库到libs目录下4)将声音文件复制到assets目…...

VMLogin防关联指纹浏览器的主帐号和子账号区别介绍
VMLogin主账户管理子账户,主要用于团队协作,分账户登录使用,主账户相当于老板,子账户相当于员工。 主账户创建并管理子账户; 主账户可以修改子账户的密码; 主账户可以设置子账户是否有创建配置文件权限&a…...

Apache DolphinScheduler GitHub Star 突破 10000!
点击蓝字 关注我们今天,Apache DolphinScheduler GitHub Star 突破 10000,项目迎来一个重要里程碑。这表明 Apache DolphinScheduler 已经在全球的开发者和用户中获得了广泛的认可和使用。DolphinScheduler 旨在解决公司日常运营中的大数据处理工作流调度…...

程序员中的女性力量——做不被定义的自己
她是office lady,亦是程序媛,程序员界的靓丽色彩,不可或缺。 “只有那些疯狂到以为自己能够改变世界的人——才能真正改变世界。” 女性该如何定义自己?程序媛怎么发挥自己最大的价值。 争取自己做选择,经济和思想都独…...

pb中Datawindow中每页打印固定行
Datawindow中每页打印固定行 第一步: 增加一个计算列,此计算列必须放在Detail段,Expression中输入:ceiling(getrow()/20),这里20还可以用全局函数取代,这样可以允许用户任意设置每页打印多少行。 第二步: 定义分组,选择菜单Rows->Create Group...按计算列字段…...

华为OD机试 - 内存池(C 语言解题)【独家】
最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧文章目录 使用说明本期题目:内存池题…...

SaaS简介
SaaS 简介 SaaS被认为是云计算的一部分,其他包括基础设施即服务(IaaS)、平台即服务(PaaS)、桌面即服务(DaaS)、托管软件即服务(MSaaS)、移动后端即服务(MBaaS)、数据中心即服务(DCaaS)、集成平台即服务(iPaaS)和信息技术管理即服务(ITMaaS) SaaS应用程序通常由web浏…...

unity 实现使用三张图片来表达车速,通过传值达到车速
//速度 public Image SpeedNums_Unit; public Image SpeedNums_Ten; //public Image SpeedNums_Hundred; //kw public Image MileageNums_Unit; public Image MileageNums_Ten; /// /// 仪表速度UI /// private void SpeedUI(string speedStr) {if (SpeedNums_Unit == null) …...

程序员看过都说好的资源网站,你值得拥有。
程序员必备的相关资源网站一.技术社区1.GitHub2.Gitee(码云)3.稀土掘金4.OSCHINA开源中国5.CSDN6.博客园7.SegmentFault(思否)8.Stack Overflow9.Golang中文社区10.ChinaUnix11.51CTO12.Ruby China二.技术教程1.Devdocs2.码农教程…...

【MySQL高级篇】第03章 用户与权限管理
第03章 用户与权限管理 1. 用户管理 1.1 登录MySQL服务器 启动MySQL服务后,可以通过mysql命令来登录MySQL服务器,命令如下: mysql –h hostname|hostIP –P port –u username –p DatabaseName –e "SQL语句"-h参数后面接主机…...

MySQL的分库分表?通俗易懂
1- 为什么要分库分表 如果一个网站业务快速发展,那这个网站流量也会增加,数据的压力也会随之而来,比如电商系统来说双十一大促对订单数据压力很大,Tps十几万并发量,如果传统的架构(一主多从)&a…...

elasticsearch 查询语法
match_all 查询所有 GET test/_search {"query": {"match_all": {}} }match 单字段匹配查询 GET test/_search {"query":{"match":{"name":"zhangsan"}} }multi_match 多字段匹配查询 GET test/_search {"…...

深入剖析MVC模型与三层架构
MVC(Model-View-Controller)模型和三层架构都是常见的软件架构模式,用于实现大型应用程序和软件系统。下面是对它们的深入剖析: MVC模型 MVC模型是一种将应用程序分成三个主要组件的软件架构模式,分别是模型…...

使用 Wall 搭建个人照片墙和视频墙
下载 Github:https://github.com/super-tongyao/wall 国内仓库(不推荐,只做加速访问,无编译包和发行版,以github仓库为准):https://gitee.com/Super_TongYao/wall 推荐github仓库,下载最新版…...

03_Linux压缩解压,用户用户组,文件权限
目录 Linux下常用的压缩格式 gzip 压缩工具 gzip 对文件夹进行压缩 bzip2 压缩工具 tar打包工具 对.tar.bz2 进行压缩和解压缩 对.tar.gz 进行压缩和解压缩 rar格式 zip格式 Linux用户 Linux用户组 创建用户和用户组 Linux文件权限 Linux文件权限修改 Linux下常用…...