网站监测/百度权重查询工具
一、带标签的Tag消息
1.1、概述
RocketMQ提供消息过滤的功能,通过Tag或者Key进行区分。我们往一个主题里面发送消息的时候,根据业务逻辑可能需要区分,比如带有tagA标签的消息被消费者A消费,带有tagB标签的消息被消费者B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才能区别对待。
1.2、什么时候该用Topic,什么时候该用Tag
不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:
(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;
(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;
(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;
(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;
总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。
二、案例代码
2.1、pom
同案例五
2.2、RocketMQConstant
同案例五
2.3、消费者
2.3.1、TagConsumer1
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer1 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer1Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","NBA");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer1]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer1] start success");}}
2.3.2、TagConsumer2
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer2 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer2Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","RUN");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer2]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer2] start success");}}
2.3.3、TagConsumer3
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer3 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer3Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","STAR || CAR");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer3]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer3] start success");}}
2.3.4、TagConsumer4
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:33* @Description: Tag消息消费者*/
@Slf4j
public class TagConsumer4 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumer4Group");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("TagTopic","CAR || MOBILE || TOURISM");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info("消费者[TagConsumer4]收到消息,消息详情:{}", StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("消费者[TagConsumer4] start success");}}
2.4、TagProducer
package org.star.tag.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;import java.nio.charset.StandardCharsets;/*** @Author: 一叶浮萍归大海* @Date: 2023/8/30 10:22* @Description: Tag消息生产者*/
@Slf4j
public class TagProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("TagProducer");producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);producer.start();log.info("Tag消息生产者 start success");String[] tags = new String[]{"NBA", "RUN", "STAR","CAR","MOBILE","TOURISM"};for (int i = 0; i < 6; i++) {String tag = tags[i % tags.length];String content = "";if ("NBA".equals(tag)) {content = "NBA message,消息编号[" + i + "]";} else if ("RUN".equals(tag)) {content = "RUN message,消息编号[" + i + "]";} else if ("STAR".equals(tag)) {content = "STAR message,消息编号[" + i + "]";} else if ("CAR".equals(tag)) {content = "CAR message,消息编号[" + i + "]";} else if ("MOBILE".equals(tag)) {content = "MOBILE message,消息编号[" + i + "]";} else if ("TOURISM".equals(tag)) {content = "TOURISM message,消息编号[" + i + "]";}log.info("当前tag:{},消息内容:{}", tag, content);Message message = new Message("TagTopic", tag, content.getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);log.info("sendStatus:{},brokerName:{},queueId:{},msgId:{}", result.getSendStatus(), result.getMessageQueue().getBrokerName(), result.getMessageQueue().getQueueId(), result.getMsgId());}producer.shutdown();}}
2.5 、控制台打印结果
# 生产者
09:53:44.850 [main] INFO org.star.tag.producer.TagProducer - Tag消息生产者 start success
09:53:44.850 [main] INFO org.star.tag.producer.TagProducer - 当前tag:NBA,消息内容:NBA message,消息编号[0]
09:53:45.308 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:0,msgId:0AA867618F8018B4AAC2262C1D530000
09:53:45.308 [main] INFO org.star.tag.producer.TagProducer - 当前tag:RUN,消息内容:RUN message,消息编号[1]
09:53:45.315 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:1,msgId:0AA867618F8018B4AAC2262C1D5C0001
09:53:45.315 [main] INFO org.star.tag.producer.TagProducer - 当前tag:STAR,消息内容:STAR message,消息编号[2]
09:53:45.319 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:2,msgId:0AA867618F8018B4AAC2262C1D640002
09:53:45.319 [main] INFO org.star.tag.producer.TagProducer - 当前tag:CAR,消息内容:CAR message,消息编号[3]
09:53:45.322 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:3,msgId:0AA867618F8018B4AAC2262C1D680003
09:53:45.323 [main] INFO org.star.tag.producer.TagProducer - 当前tag:MOBILE,消息内容:MOBILE message,消息编号[4]
09:53:45.326 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:0,msgId:0AA867618F8018B4AAC2262C1D6B0004
09:53:45.326 [main] INFO org.star.tag.producer.TagProducer - 当前tag:TOURISM,消息内容:TOURISM message,消息编号[5]
09:53:45.329 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:1,msgId:0AA867618F8018B4AAC2262C1D6E0005# 消费者TagConsumer1
09:53:45.310 [ConsumeMessageThread_2] INFO org.star.tag.consumer.TagConsumer1 - 消费者[TagConsumer1]收到消息,消息详情:NBA message,消息编号[0]# 消费者TagConsumer2
09:53:45.316 [ConsumeMessageThread_2] INFO org.star.tag.consumer.TagConsumer2 - 消费者[TagConsumer2]收到消息,消息详情:RUN message,消息编号[1]# 消费者TagConsumer3
09:53:45.322 [ConsumeMessageThread_3] INFO org.star.tag.consumer.TagConsumer3 - 消费者[TagConsumer3]收到消息,消息详情:STAR message,消息编号[2]
09:53:45.327 [ConsumeMessageThread_4] INFO org.star.tag.consumer.TagConsumer3 - 消费者[TagConsumer3]收到消息,消息详情:CAR message,消息编号[3]# 消费者TagConsumer4
09:53:45.327 [ConsumeMessageThread_4] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:CAR message,消息编号[3]
09:53:45.344 [ConsumeMessageThread_6] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:MOBILE message,消息编号[4]
09:53:45.344 [ConsumeMessageThread_5] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息,消息详情:TOURISM message,消息编号[5]
相关文章:

系列十二、Java操作RocketMQ之带标签Tag的消息
一、带标签的Tag消息 1.1、概述 RocketMQ提供消息过滤的功能,通过Tag或者Key进行区分。我们往一个主题里面发送消息的时候,根据业务逻辑可能需要区分,比如带有tagA标签的消息被消费者A消费,带有tagB标签的消息被消费者B消费&…...

Java面向对象学习笔记-1
前言 “Java 学习笔记” 是为初学者和希望加深对Java编程语言的理解的人们编写的。Java是一门广泛应用于软件开发领域的强大编程语言,它的语法和概念对于初学者来说可能有些复杂。这份学习笔记的目的是帮助读者逐步学习Java的基本概念,并提供了一系列示…...

el-table根据data动态生成列和行
css //el-table-column加上fixed后会导致悬浮样式丢失,用下面方法可以避免 .el-table__body .el-table__row.hover-row td{background-color: #083a78 !important; } .el-table tbody tr:hover>td {background: #171F34 !important; }html <el-table ref&quo…...

【c++】如何有效地利用命名空间?
🌱博客主页:青竹雾色间 😘博客制作不易欢迎各位👍点赞⭐收藏➕关注 ✨人生如寄,多忧何为 ✨ 目录 前言什么是命名空间?命名空间的语法命名空间的使用避免命名冲突命名空间的嵌套总结 前言 当谈到C编…...

Go语言传参
为了让新手尽快熟悉go的使用,特记录此文,不必谢我,转载请注明! Go 语言中参数传递的各种效果,主要内容包括: 传值效果指针传递结构体传递map 传递channel 传递切片传递错误传递传递效果示例传递方式选择原文连接:https://mp.weixin.qq.com/s?__biz=MzA5Mzk4Njk1OA==&…...

SAP PI 配置SSL链接接口报错问题处理Peer certificate rejected by ChainVerifier
出现这种情况一般无非是没有正确导入证书或者证书过期的情况 第一种,如果没有导入证书的话,需要在NWA中的证书与验证-》CAs中导入管理员提供的证书,这里需要注意的是,需要导入完整的证书链。 第二种如果是证书过期的,…...

【MyBatisⅡ】动态 SQL
目录 🎒1 if 标签 🫖2 trim 标签 👠3 where 标签 🦺4 set 标签 🎨5 foreach 标签 动态 sql 是Mybatis的强⼤特性之⼀,能够完成不同条件下不同的 sql 拼接。 在 xml 里面写判断条件。 动态SQL 在数据库里…...

音视频入门基础理论知识
文章目录 前言一、视频1、视频的概念2、常见的视频格式3、视频帧4、帧率5、色彩空间6、采用 YUV 的优势7、RGB 和 YUV 的换算 二、音频1、音频的概念2、采样率和采样位数①、采样率②、采样位数 3、音频编码4、声道数5、码率6、音频格式 三、编码1、为什么要编码2、视频编码①、…...

Pytorch中如何加载数据、Tensorboard、Transforms的使用
一、Pytorch中如何加载数据 在Pytorch中涉及到如何读取数据,主要是两个类一个类是Dataset、Dataloader Dataset 提供一种方式获取数据,及其对应的label。主要包含以下两个功能: 如何获取每一个数据以及label 告诉我们总共有多少的数据 Datal…...

python如何使用打开文件对话框选择文件?
python如何使用打开文件对话框选择文件? ━━━━━━━━━━━━━━━━━━━━━━ 在Python中,可以使用Tkinter库中的filedialog子模块来打开一个文件对话框以供用户选择文件。以下是一个简单的例子,演示如何使用tkinter.filedialog打…...

虚拟化和容器
文章目录 1 介绍1.1 简介1.2 虚拟化工作原理1.3 两大核心组件:QEMU、KVMQEMUKVM 1.4 发展历史1.5 虚拟化类型1.6 云计算与虚拟化1.7 HypervisorHypervisor分为两大类 1.8 虚拟化 VS 容器 2 虚拟化应用dockerdocker 与虚拟机的区别 K8Swine 参考 1 介绍 1.1 简介 虚…...

LeetCode-78-子集
题目描述: 给你一个整数数组 nums ,数组中的元素 互不相同。返回该数组所有可能的子集(幂集)。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 题目链接:LeetCode-78-子集 解题思路:递归回溯 题…...

js对象转json文件
目录 需求1.首先寻找类似需求的数据2.对数据进行转换3.将转换后的数据转为json文件4.完整代码 需求 需求:在做项目时,遇到了需要制作地址列表的功能,这一般都会用到一些开源的组件库,但是有个问题是不同组件库之间的城市列表数据结…...

【免费模板】2023数学建模国赛word+latex模板免费分享
无需转发 免费获取2023国赛模板,获取方式见文末 模板文件预览如下: 模板参考格式如下: (题目)XXXXXX 摘 要: 开头段:需要充分概括论文内容,一般两到三句话即可,长度控…...

基于HBuilder X平台下的 驾校报名考试管理系统 uniapp 微信小程序3n9o5
本课题研究的是基于HBuilder X系统平台下的驾校管理系统,开发这款驾校管理系统主要是为了帮助学员可以不用约束时间与地点进行查看教练信息、考场信息等内容。本文详细讲述了驾校管理系统的界面设计及使用,主要包括界面的实现、控件的使用、界面的布局和…...

电商3D资产优化管线的自动化
如果你曾经尝试将从 CAD 程序导出的 3D 模型上传到 WebGL 或 AR 服务,那么可能会遇到最大文件大小、永无休止的进度条和糟糕的帧速率等问题。 为了创作良好的在线交互体验,优化 3D 数据的大小和性能至关重要。 这也有利于你的盈利,因为较小的…...

Android 大图显示优化方案-加载Gif 自定义解码器
基于Glide做了图片显示的优化,尤其是加载Gif图的优化,原生Glide加载Gif图性能较低。在原生基础上做了自定义解码器的优化,提升Glide性能 Glide加载大图和Gif 尤其是列表存在gif时,会有明显卡顿,cpu和内存占用较高&…...

Leetcode.664 奇怪的打印机
题目链接 Leetcode.664 奇怪的打印机 hard 题目描述 有台奇怪的打印机有以下两个特殊要求: 打印机每次只能打印由 同一个字符 组成的序列。每次可以在从起始到结束的任意位置打印新字符,并且会覆盖掉原来已有的字符。 给你一个字符串 s ,你…...

正中优配:散户怎么实现T+0?散户在股市上怎么变相T+0?
T0是指当天买入的标的物,在当天就能卖出的买卖方式,其中,在a股市场上,散户能够通过一些办法直接地完成T0买卖方式,接下来,正中优配为大家预备了相关内容,以供参阅。 散户在股票市场上࿰…...

ZooInspector
一、在window,使用我们先打开Zookeeper,目录bin下的zkServer.cmd,把Zookeeper运行起来 编辑https://img.111com.net/attachment/art/187687/5f0c25fbe580c.png 二、可以使用目录bin下的zkCli.cmd,查询Zookeeper数据的方式,但是…...

2023高教社杯 国赛数学建模B题思路 - 多波束测线问题
1 赛题 B 题 多波束测线问题 单波束测深是利用声波在水中的传播特性来测量水体深度的技术。声波在均匀介质中作匀 速直线传播, 在不同界面上产生反射, 利用这一原理,从测量船换能器垂直向海底发射声波信 号,并记录从声波发射到信…...

【计算机视觉 | 目标检测】arxiv 计算机视觉关于目标检测的学术速递(9 月 4 日论文合集)
文章目录 一、检测相关(8篇)1.1 Impact of Image Context for Single Deep Learning Face Morphing Attack Detection1.2 A Theoretical and Practical Framework for Evaluating Uncertainty Calibration in Object Detection1.3 What Makes Good Open-Vocabulary Detector: A…...

游戏优化注意点
特效性能分析: 1、粒子数量太多,这个会对CPU的耗时产生一定的压力。 2、粒子的size太大,这样容易导致渲染的像素数量非常高。 3、Overdraw非常高,当场上粒子数非常高导致叠层很高,会造成Overdraw很高,这会…...

【unity3D】如何修改相机的默认视角
💗 未来的游戏开发程序媛,现在的努力学习菜鸡 💦本专栏是我关于游戏开发的学习笔记 🈶本篇是unity的如何修改相机的默认视角 如何修改相机的默认视角 Game窗口运行的话视角是这样的: 此时Scene窗口的视角是这样的&…...

Docker的初级使用
Docker的初级使用 Docker的安装1.1 如果之前安装过旧版本的Docker,可以使用下面命令卸载:1.2.安装docker1.3.启动docker1.4.配置镜像加速2.CentOS7安装DockerCompose2.1.下载2.2.修改文件权限2.3.Base自动补全命令:3.Docker镜像仓库3.1.简化版镜像仓库3.2.带有图形化界面版本…...

minimumLineSpacing和minimumInteritemSpacing问题研究
结论:minimumLineSpacing和minimumInteritemSpacing问题研究 (1)如果cell的宽度是固定的,方向是水平时, 1 3 5 2 4 6 minimumLineSpacing 是 12 到 34的距离 minimumInteritemSpacing 是1到2的距离 (2)如果cell的宽度是不固定的࿰…...

【操作系统】聊聊Linux内存工作机制
内存主要是用来存储系统和应用程序的指令、数据、缓存等 内存映射 内存是需要安全机制保护的,所以只有内核才可以直接访问物理内存。进程如果要访问内存需要通过独立的虚拟地址空间。 虚拟地址空间其实包含两部分。一部分是内核空间,另一部分就是用户…...

MySQL索引的类型有哪些?
分析&回答 从功能逻辑角度,可分为: 普通索引 INDEX(普通索引) ALTER TABLE table_name ADD INDEX index_name ( column )唯一索引 UNIQUE(唯一索引) ALTER TABLE table_name ADD UNIQUE (column)主键索引 PRIMARY KEY(主键索引…...

【JavaScript】在指定dom元素前面创建标签元素
一、基础操作过程 要在指定的DOM元素前面创建标签元素,有以下步骤: 获取指定的DOM元素:使用document.querySelector()或document.getElementById()等方法来获取指定的DOM元素。 const targetElement document.querySelector(#targetElement…...

ARMv8 TTBRx寄存器
ARMv8 TTBRx寄存器 1 TTBR0_ELx and TTBR1_ELx2 TTBR0_ELx2.1 TTBR0_EL12.2 TTBR0_EL22.3 TTBR0_EL33 TTBR13.1 TTBR1_EL13.2 TTBR1_EL2 4 访问TTBRx寄存器4.1 TTBR0_ELx4.2 TTBR1_ELx 5 TTBRx保留的是物理地址还是虚拟地址5.1 保存的是物理地址还是虚拟地址5.2 为什么是物理地…...