Spring boot封装rocket mq 教程
1、rocket mq版本
5.1.3
2、pom引入rocket mq依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency>
3、发送MQ消息工具类
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;@Slf4j
public class MqSendUtil {@SneakyThrowspublic static MessageId sendMq(String topic, String tag, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag(tag)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}@SneakyThrowspublic static MessageId sendMqNoTag(String topic, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}}
4、发送MQ消息测试代码
import cn.hutool.core.util.IdUtil;
import org.recipe.draw.common.util.MqSendUtil;public class MqSendTest {public static void test1() {MqSendUtil.sendMq("demo", "tag", "哈哈哈哈tag", IdUtil.getSnowflakeNextIdStr());}public static void main(String[] args) {test1();}
}
5、MessageContext 消息内容的封装
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Collection;
import java.util.Map;@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageContext {private String messageId;private String topic;private String body;private Map<String, String> properties;private Collection<String> keys;private Long deliveryTimestamp;private String bornHost;private Long bornTimestamp;private int deliveryAttempt;}
6、AbstractMqConsumer 发送mq消息的抽象类
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.boot.CommandLineRunner;import java.nio.charset.StandardCharsets;
import java.util.Collections;@Slf4j
public abstract class AbstractMqConsumer implements CommandLineRunner {public abstract String topic();public abstract String consumerGroup();public abstract String tag();public abstract void process(MessageContext messageContext);@Overridepublic void run(String... args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoints = "127.0.0.1:9080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = StrUtil.isEmpty(tag()) ? "*" : tag();FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建。String consumerGroup = consumerGroup();// 指定需要订阅哪个目标Topic,Topic需要提前创建。String topic = topic();// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组。.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 处理消息并返回消费结果。MessageContext context = toMessageContext(messageView);
// log.info("收到mq消息主体内容:{}",context);try {process(context);} catch (Exception e) {log.error("处理mq消息出现异常,消息已自动丢弃,不会再投入队列:", e);}return ConsumeResult.SUCCESS;}).build();log.info("消费者初始化完成,topic:{},tag:{},consumerGroup:{}", topic, tag, consumerGroup);}private MessageContext toMessageContext(MessageView messageView) {Long deliveryTimestamp = messageView.getDeliveryTimestamp().isPresent() ? messageView.getDeliveryTimestamp().get() : null;return MessageContext.builder().messageId(messageView.getMessageId().toString()).topic(messageView.getTopic()).body(StandardCharsets.UTF_8.decode(messageView.getBody()).toString()).properties(messageView.getProperties()).keys(messageView.getKeys()).deliveryTimestamp(deliveryTimestamp).bornHost(messageView.getBornHost()).deliveryAttempt(messageView.getDeliveryAttempt()).build();}}
7、具体的消费类
topic指定消费者订阅的话题,comsumerGroup指明该消费者属于哪一个消费者分组,tag表明是否要获取指定标签的消息,process代表具体的业务处理逻辑,具体消息的内容可以MessageContext 类里面获取
import lombok.extern.slf4j.Slf4j;
import org.recipe.draw.common.mqcomsumer.abstracts.AbstractMqConsumer;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class DemoConsumer extends AbstractMqConsumer {@Overridepublic String topic() {return "demo";}@Overridepublic String consumerGroup() {return "demo";}@Overridepublic String tag() {return null;}@Overridepublic void process(MessageContext messageContext) {log.info("收到消息:{}", messageContext);}
}
相关文章:
Spring boot封装rocket mq 教程
1、rocket mq版本 5.1.3 2、pom引入rocket mq依赖 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency> 3、发送MQ消息工具类 impor…...
Java Swing手搓童年坦克大战游戏(I)
前言 业余偶尔对游戏有些兴趣,不过这样的时代,硬件软件飞速进步,2D游戏画面都无比精美,之前的8bit像素游戏时代早就过去了,不过那时候有许多让人印象深刻的游戏比如魂斗罗、超级玛丽、坦克大战(Battle City)等等。 学…...
【DevOps-04]】Operate阶段工具
一、简要说明 安装Docker安装Docker-compose二、安装Docker 官网地址:https://www.docker.com文档地址:Docker Docs仓库地址:https://hub.docker.com1、Docker相关网站 官方网站Get Docker | Docker Docs...
力扣2807.在链表中插入最大公约数
思路:遍历链表,对于每一个结点求出它与下一个结点的最大公约数并插入到俩个结点之间 代码: /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}…...
开始刷Leetcode之前你需要知道的 - The basic is all you need
数据结构:列表,哈希表,集合,栈,堆,链表,二叉树,图 入门算法:递归,排序算法,二分法,bfs,dfs list/array 列表常见操作&am…...
【PostgreSQL】模式Schema
PostgreSQL 数据库集群包含一个或多个命名数据库。角色和一些其他对象类型在整个集群中共享。与服务器的客户端连接只能访问单个数据库中的数据,该数据库在连接请求中指定。 数据库包含一个或多个命名schema,而这些schema又包含表。schema还包含其他类型…...
JavaScript实现的复杂功能:自动生成带水印的图片
#程序员的崩溃瞬间 在本文中,我们将讨论一个JavaScript实现的复杂功能,该功能可以自动为图片添加水印。这个功能在许多场景中都非常有用,例如,如果你想保护你的图片版权,或者你想在你的网站上显示自定义的水印。 一、…...
图神经网络|8.2 图卷积的计算基本方法
不同于一般的神经网络,网络层数的并不用特别多。 原因是只需要少数次数迭代后(当迭代次数为图上的直径?任意两点最短距离的最大值?),某节点便可获取得到图上所有的节点。 通俗的理解是,在社会中…...
equals()与hashCode()方法详解
java.lang.Object类中有两个非常重要的方法: 1 2 public boolean equals(Object obj) public int hashCode() Object类是类继承结构的基础,所以是每一个类的父类。所有的对象,包括数组,都实现了在Object类中定义的方法。 回到…...
六、基于Flask、Flasgger、marshmallow的开发调试
基于Flask、Flasgger、marshmallow的开发调试 问题描述调试方法一调试方法二调试方法三 问题描述 现在有一个传入传出为json格式文件的,Flask-restful开发的程序,需要解决如何调试的问题。 #!/usr/bin/python3 # -*- coding: utf-8 -*- # Project :…...
TypeScript 从入门到进阶之基础篇(三) 元组类型篇
系列文章目录 TypeScript 从入门到进阶系列 TypeScript 从入门到进阶之基础篇(一) ts基础类型篇TypeScript 从入门到进阶之基础篇(二) ts进阶类型篇TypeScript 从入门到进阶之基础篇(三) 元组类型篇TypeScript 从入门到进阶之基础篇(四) symbol类型篇 持续更新中… 文章目录 …...
现代CPU的多种运行模式
目前的CPU大多是支持X86-64技术的兼容CPU,这包括AMD64以及Intel的IA32E(后被正式命名为EM64T,Extended Memory 64 Technology),因为AMD64先出,而EM64T与AMD64完全兼容,所以也统一称为AMD64技术。…...
Python PDF处理模块pypdf库详解
概要 PDF(Portable Document Format)是一种常见的文档格式,广泛用于存储和共享文本和图像数据。在 Python 中,有许多库可以用于处理 PDF 文件,其中之一就是 PyPDF。PyPDF 是一个功能强大的库,它允许你读取…...
C++上位软件通过LibModbus开源库和西门子S7-1200/S7-1500/S7-200 PLC进行ModbusTcp 和ModbusRTU 通信
前言 一直以来上位软件比如C等和西门子等其他品牌PLC之间的数据交换都是大家比较头疼的问题,尤其是C上位软件程序员。传统的方法一般有OPC、Socket 等,直到LibModbus 开源库出现后这种途径对程序袁来说又有了新的选择。 Modbus简介 Modbus特点 1 &#…...
PLSQL Developer 15安装和oracle客户端安装
文章目录 前言一、PLSQL Developer1.下载2.安装 二、oracle客户端1.下载2.环境变量 三、使用1. oci2. 连接3. 配置文件 总结 前言 oracle是经常使用的数据库,PLSQL Developer是众多产品中比较不错的一款工具,接下来我们来介绍PLSQL Developer的安装和使…...
【深度deepin】深度安装,jdk,tomcat,Nginx安装
目录 一 深度 1.1 介绍 1.2 与别的操作系统的优点 二 下载镜像文件及VM安装deepin 三 jdk,tomcat,Nginx安装 3.1 JDK安装 3.2 安装tomcat 3.3 安装nginx 一 深度 1.1 介绍 由深度科技社区开发的开源操作系统,基于Linux内核…...
解决flask启动报错:ImportError: DLL load failed while importing _dukpy: 找不到指定的程序
现象: 原因:dukpy没有win32执行库 解决办法: 到lfd.uci.edu 第三方库下载dukpy的win32 whl文件 注意: 需要跟你python版本和windows平台(32位/64位)对应 https://www.lfd.uci.edu/~gohlke/pythonlibs/#…...
腾讯面试总结
腾讯 一面 mysql索引结构?redis持久化策略?zookeeper节点类型说一下;zookeeper选举机制?zookeeper主节点故障,如何重新选举?syn机制?线程池的核心参数;threadlocal的实现ÿ…...
面向对象进阶(static关键字,继承,方法重写,super,this)
文章目录 面向对象进阶部分学习方法:今日内容教学目标 第一章 复习回顾1.1 如何定义类1.2 如何通过类创建对象1.3 封装1.3.1 封装的步骤1.3.2 封装的步骤实现 1.4 构造方法1.4.1 构造方法的作用1.4.2 构造方法的格式1.4.3 构造方法的应用 1.5 this关键字的作用1.5.1…...
Blazor项目如何调用js文件
以下是来自千问的回答并加以整理:(说一句,文心3.5所给的回答不完善,根本运行不起来,4.0等有钱了试试) 在Blazor项目中引用JavaScript文件(.js)以实现与JavaScript的互操作ÿ…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
地震勘探——干扰波识别、井中地震时距曲线特点
目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波:可以用来解决所提出的地质任务的波;干扰波:所有妨碍辨认、追踪有效波的其他波。 地震勘探中,有效波和干扰波是相对的。例如,在反射波…...
深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
