当前位置: 首页 > news >正文

549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28

目录

    • 一、Spring 整合 RocketMQ
      • 1.1 消息生产者
      • 1.2 消息消费者
      • 1.3 Spring 配置文件
      • 1.4 运行实例程序
    • 二、参考链接

一、Spring 整合 RocketMQ

不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件,Spring 社区已经通过多种方式提供了对这些中间件产品集成,例如通过 spring-jms 整合 ActiveMQ、通过 Spring AMQP 项目下的 spring-rabbit 整合 RabbitMQ、通过 spring-kafka 整合 kafka ,通过他们可以在 Spring 项目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三种方式,一是将消息生产者和消费者定义成 bean 对象交由 Spring 容器管理,二是使用 RocketMQ 社区的外部项目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)然后通过 spring-jms 方式集成使用,三是如果你的应用是基于 spring-boot 的,可以使用 RocketMQ 的外部项目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比较方便的收发消息。
总的来讲 rocketmq-jms 项目实现了 JMS 1.1 规范的部分内容,目前支持 JMS 中的发布/订阅模型收发消息。rocketmq-spring-boot-starter 项目目前已经支持同步发送、异步发送、单向发送、顺序消费、并行消费、集群消费、广播消费等特性,如果比较喜欢 Spring Boot 这种全家桶的快速开发框架并且现有特性已满足业务要求可以使用该项目。当然从 API 使用上最灵活的还是第一种方式,下面以第一种方式为例简单看下Spring 如何集成 RocketMQ 的。

1.1 消息生产者

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;public class SpringProducer {private Logger logger = Logger.getLogger(getClass());private String producerGroupName;private String nameServerAddr;private DefaultMQProducer producer;public SpringProducer(String producerGroupName, String nameServerAddr) {this.producerGroupName = producerGroupName;this.nameServerAddr = nameServerAddr;}public void init() throws Exception {logger.info("开始启动消息生产者服务...");//创建一个消息生产者,并设置一个消息生产者组producer = new DefaultMQProducer(producerGroupName);//指定 NameServer 地址producer.setNamesrvAddr(nameServerAddr);//初始化 SpringProducer,整个应用生命周期内只需要初始化一次producer.start();logger.info("消息生产者服务启动成功.");}public void destroy() {logger.info("开始关闭消息生产者服务...");producer.shutdown();logger.info("消息生产者服务已关闭.");}public DefaultMQProducer getProducer() {return producer;}
}

消息生产者就是把生产者 DefaultMQProducer 对象的生命周期分成构造函数、init、destroy 三个方法,构造函数中将生产者组名、NameServer 地址作为变量由 Spring 容器在配置时提供,init 方法中实例化 DefaultMQProducer 对象、设置 NameServer 地址、初始化生产者对象,destroy 方法用于生产者对象销毁时清理资源。

1.2 消息消费者

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;public class SpringConsumer {private Logger logger = Logger.getLogger(getClass());private String consumerGroupName;private String nameServerAddr;private String topicName;private DefaultMQPushConsumer consumer;private MessageListenerConcurrently messageListener;public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {this.consumerGroupName = consumerGroupName;this.nameServerAddr = nameServerAddr;this.topicName = topicName;this.messageListener = messageListener;}public void init() throws Exception {logger.info("开始启动消息消费者服务...");//创建一个消息消费者,并设置一个消息消费者组consumer = new DefaultMQPushConsumer(consumerGroupName);//指定 NameServer 地址consumer.setNamesrvAddr(nameServerAddr);//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅指定 Topic 下的所有消息consumer.subscribe(topicName, "*");//注册消息监听器consumer.registerMessageListener(messageListener);// 消费者对象在使用之前必须要调用 start 初始化consumer.start();logger.info("消息消费者服务启动成功.");}public void destroy(){logger.info("开始关闭消息消费者服务...");consumer.shutdown();logger.info("消息消费者服务已关闭.");}public DefaultMQPushConsumer getConsumer() {return consumer;}}

同消息生产者类似,消息消费者是把生产者 DefaultMQPushConsumer 对象的生命周期分成构造函数、init、destroy 三个方法,具体含义在介绍 Java 访问 RocketMQ 实例时已经介绍过了,不再赘述。当然,有了消费者对象还需要消息监听器在接收到消息后执行具体的处理逻辑。

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class MessageListener implements MessageListenerConcurrently {private Logger logger = Logger.getLogger(getClass());public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (list != null) {for (MessageExt ext : list) {try {logger.info("监听到消息 : " + new String(ext.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}

消息监听器类就是把前面 Java 示例中注册消息监听器时声明的匿名内部类代码抽取出来定义成单独一个类而已。

1.3 Spring 配置文件

因为只使用 Spring 框架集成,所以除了 Sping 框架核心 jar 包外不需要额外添加依赖包了。本例中将消息生产者和消息消费者分成两个配置文件,这样能更好的演示收发消息的效果。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsd"><bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy"><constructor-arg name="nameServerAddr" value="localhost:9876"/><constructor-arg name="producerGroupName" value="spring_producer_group"/></bean>
</beans>

消息生产者配置很简单,定义了一个消息生产者对象,该对象初始化时调用 init 方法,对象销毁前执行 destroy 方法,将 Name Server 地址和生产者组配置好。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsd"><bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" /><bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy"><constructor-arg name="nameServerAddr" value="localhost:9876"/><constructor-arg name="consumerGroupName" value="spring_consumer_group"/><constructor-arg name="topicName" value="spring-rocketMQ-topic" /><constructor-arg name="messageListener" ref="messageListener" /></bean></beans>

消息消费者同消息生产者配置类似,多了一个消息监听器对象的定义和绑定。

1.4 运行实例程序

按前述步骤 启动 Name Server 和 Broker,接着运行消息生产者和消息消费者程序,简化起见我们用两个单元测试类模拟这两个程序:

package org.study.mq.rocketMQ.spring;import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringProducerTest {private ApplicationContext container;@Beforepublic void setup() {container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");}@Testpublic void sendMessage() throws Exception {SpringProducer producer = container.getBean(SpringProducer.class);for (int i = 0; i < 20; i++) {//创建一条消息对象,指定其主题、标签和消息内容Message msg = new Message("spring-rocketMQ-topic",null,("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */);//发送消息并返回结果SendResult sendResult = producer.getProducer().send(msg);System.out.printf("%s%n", sendResult);}}
}

SpringProducerTest 类模拟消息生产者发送消息。

package org.study.mq.rocketMQ.spring;import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringConsumerTest {private ApplicationContext container;@Beforepublic void setup() {container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");}@Testpublic void consume() throws Exception {SpringConsumer consumer = container.getBean(SpringConsumer.class);Thread.sleep(200 * 1000);consumer.destroy();}
}

SpringConsumerTest 类模拟消息消费者者接收消息,在 consume 方法返回之前需要让当前线程睡眠一段时间,使消费者程序继续存活才能监听到生产者发送的消息。

分别运行 SpringProducerTest 类 和 SpringConsumerTest 类,在 SpringConsumerTest 的控制台能看到接收的消息:
在这里插入图片描述
假如启动两个 SpringConsumerTest 类进程,因为它们属于同一消费者组,在 SpringConsumerTest 的控制台能看到它们均摊到了消息:
在这里插入图片描述
在这里插入图片描述

二、参考链接

[01] 消息队列之 RocketMQ

相关文章:

549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28

目录一、Spring 整合 RocketMQ1.1 消息生产者1.2 消息消费者1.3 Spring 配置文件1.4 运行实例程序二、参考链接一、Spring 整合 RocketMQ 不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件&#xff0c;Spring 社区已经通过多种方式提供了对这些中间件产品集成&#xff0c;例如通…...

如何使用SpringBoot ⽇志?

Spring Boot自定义日志的打印:在一个类中先获取到打印日志对象&#xff08;日志框架提供的日志对象&#xff0c;而日志框架默认已经集成到Spring Boot里了&#xff0c;springboot默认使用 slf4jlogback);注意&#xff1a;得到日志对象Logger ->来自于slf4j2、使用目志对象提…...

山东大学数字图像处理实验:MATLAB的图像显示方法

文章目录MATLAB 学习实验目的实验原理及方法实验内容MATLAB的图像显示方法实验目的实验内容MATLAB 学习 实验目的 了解 MATLAB 的基本功能及操作方法。掌握典型离散信号的 Matlab 产生和显示。 实验原理及方法 在 MATLAB 中, 序列是用矩阵向量表示, 但它没有包含采样信息, …...

Java缓存面试题——Redis解决方案

文章目录1、什么是缓存击穿&#xff1f;该如何解决2、什么是缓存穿透&#xff1f;该如何解决3、什么是缓存雪崩&#xff1f;该如何解决4、什么是BigKey&#xff1f;该如何解决bigkey的危害发现bigkey解决bigkey5、redis过期策略都有哪些&#xff1f;6、讲一讲Redis缓存的数据一…...

Flink:The generic type parameters of ‘Collector‘ are missing 类型擦除

类型擦除问题处理报错日志描述问题描述报错解决其他方法方法一&#xff1a;TypeInformation方法二&#xff1a;TypeHint报错日志描述 报错日志&#xff1a; The generic type parameters of Collector are missing. In many cases lambda methods dont provide enough informa…...

MySQL查询操作

系列文章目录前言一、简单查询SELECT子句SELECT后面之间跟列名DISTINCT,ALL列表达式列更名WHERE子句WHERE子句中可以使用的查询条件比较运算BETWEEN...AND...集合查询&#xff1a;IN模糊查询LIKE空值比较&#xff1a;IS NULL多重条件查询SELECT 的基本结构ORDER BY子句排序聚集…...

Redis-day01-note

Redis-day01-note 文章目录**Redis-day01-note****安装****配置文件详解****数据类型****字符串类型(string)**列表数据类型&#xff08;List&#xff09;****与python交互**Redis介绍特点及优点 1、开源的&#xff0c;使用C编写&#xff0c;基于内存且支持持久化 2、高性能的…...

嵌入式C基础知识(19)

时序在前面我们说到当处理器要向外设芯片写数据时&#xff0c;需要先将所需访问的外设的地址放在地址总线上&#xff0c;然后&#xff0c;由译码器将地址总线上的数据转换成片选信号&#xff0c;片选信号则使能目标外设芯片&#xff0c;接下来处理器写数据到数据总线上&#xf…...

java 2(程序流程控制)【含例题详解】

java ——程序流程控制 ✍作者&#xff1a;电子科大不知名程序员 &#x1f332;专栏&#xff1a;java学习指导 各位读者如果觉得博主写的不错&#xff0c;请诸位多多支持&#xff1b;如果有错误的地方&#xff0c;欢迎在评论区指出 目录java ——程序流程控制分支结构if-elsesw…...

基于Conda完成创建多版本python环境

文章目录基于Conda完成创建多版本python环境基于Conda完成创建多版本python环境 通过cmd打开conda环境 d:\ProgramData\Anaconda3\Scripts\activate创建python3.7的环境 conda create -n py3.7 python3.7产生错误 Collecting package metadata (repodata.json): failed Unav…...

35岁的测试被裁,公司地位还不如00后...

国内的互联网行业发展较快&#xff0c;所以造成了技术研发类员工工作强度比较大&#xff0c;同时技术的快速更新又需要员工不断的学习新的技术。因此淘汰率也比较高&#xff0c;超过35岁的基层研发类员工&#xff0c;往往因为家庭原因、身体原因&#xff0c;比较难以跟得上工作…...

vue H5跳转小程序报错:config:fail,Error: 系统错误,错误码:63002,invalid signature

微信开发者工具下载地址与更新日志 错误码&#xff1a;63002,invalid signature 无效的签名 附录5 微信网页开发 /JS-SDK说明文档 微信 JS 接口签名校验工具 全局返回码说明 ​ 排查步骤 确认签名算法正确&#xff0c;可用 http://mp.weixin.qq.com/debug/cgi-bin/sand…...

来面试阿里测开工程师,HR问我未来3-5年规划,我给HR画个大饼。

在面试的过程中是不是经常被面试官问未来几年的职业规划?你会答吗&#xff1f;是不是经常脑袋里一片空白&#xff0c;未来规划&#xff1f;我只是想赚更多的钱啊&#xff0c;哈哈哈&#xff0c;今天我来教大家&#xff0c;如何给面试官画一个大饼&#xff0c;让他吃的不亦乐乎…...

【2373. 矩阵中的局部最大值】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 给你一个大小为 n x n 的整数矩阵 grid 。 生成一个大小为 (n - 2) x (n - 2) 的整数矩阵 maxLocal &#xff0c;并满足&#xff1a; maxLocal[i][j] 等于 grid 中以 i 1 行和 j 1 列为中心的 3 …...

Read book Netty in action(Chapter VII)--ChannelHandler和ChannelPipeline

序言 我们曾经学过了ByteBuf – netty的数据容器&#xff0c;还有ChannelHandler和ChannelPipeline&#xff0c;这一把将他们组合起来&#xff0c;这些组件的交互正是Netty的灵魂所在&#xff01; ChannelHanlder家族 在详细地学习ChannelHanlder之前&#xff0c;我们将在Ne…...

react的严格模式 和 解决react useEffect执行两次

useEffect执行两次 这个问题&#xff0c;主要是刚接触react的时候发的问题&#xff0c;当时也没总结。现在回过头来再总结一次&#xff01;&#xff01;&#xff01; 文章目录useEffect执行两次前言一、为什么useEffect执行两次1.React的严格模式&#xff08;模版创建项目&…...

C++中的STL

一、概念 STL&#xff0c;英文全称 standard template library&#xff0c;中文可译为标准模板库或者泛型库&#xff0c;其包含有大量的模板类和模板函数&#xff0c;是 C 提供的一个基础模板的集合&#xff0c;用于完成诸如输入/输出、数学计算等功能。 STL 最初由惠普实验室…...

【沐风老师】3dmax一键窗户生成器插件使用方法详解

3dmax一键窗户生成器插件教程 3dMax一键窗户生成器是一个在3dMax中自动创建3D窗户模型的脚本。它有28种风格的窗户样式&#xff0c;可以在Archviz项目中灵活应用&#xff0c;同时为3D艺术家节省大量时间。 【适用版本】 适用3dMax 2018.2及更高版本 【安装方法】 1.解压缩包&…...

【图像处理】数字图像处理基础(分辨率,像素,显示...)

Table of Contents1.数字图像处理基础1.1 图像表示1.1.1 图像成像模型1.1.2 数字图像的表示a.图像采样b.图像灰度的量化c.算比特数1.2 分辨率1.2.1 空间分辨率1.2.2 灰度分辨率1.3 像素间的关系1.3.1 像素邻域a.4邻域b.4对角邻域c.8邻域1.3.2 像素邻接1.3.3 像素连通1.3.4 像素…...

UE实现相机飞行效果CesiumForUnreal之DynamicPawn飞行原理浅析

文章目录 1.实现目标2.实现过程2.1 FlyTo实现原理与代码2.2 DynamicPawn飞行原理3.参考资料1.实现目标 基于CesiumForUnreal的Dynamic Pawn实现飞行效果GIF动图: 2.实现过程 实现原理较为简单,基于CesiumForUnreal插件中DynamicPawn中的Camera实现相关功能。其中FlyTo直接通…...

AIGC被ChatGPT带火!底层基础算力有望爆发式增长

ChatGPT火爆全球的背后&#xff0c;可以窥见伴随人工智能技术的发展&#xff0c;数字内容的生产方式向着更加高效迈进。ChatGPT属于AIGC的具体应用&#xff0c;而AIGC是技术驱动的数字内容新生产方式。AIGC类产品未来有望成为5G时代新的流量入口&#xff0c;率先受益的有望是AI…...

【链表OJ题(一)】移除链表元素

​ ​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;数据结构 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录链表OJ题(一)1. 移除…...

【解锁技能】学会Python条件语句的终极指南!

文章目录前言一. python条件语句的介绍1.1 什么是条件语句1.2 条件语句的语法1.3 关于内置函数bool()二. 分支语句之单分支三. 多分支语句3.1 二分支语句3.2 多分支语句3.3 嵌套循环总结前言 &#x1f3e0;个人主页&#xff1a;欢迎访问 沐风晓月的博客 &#x1f9d1;个人简介&…...

如何通过rem实现移动端的适配?

一、rem、em、vw\vh的区别&#xff1a; rem&#xff1a;参照HTML根元素的font-size em&#xff1a;参照自己的font-size vw/vh&#xff1a;将视口宽高平分100等份&#xff0c;数值就是所占比例 <!DOCTYPE html> <html lang"en"><head><meta…...

【论文阅读】-姿态识别

记录论文阅读&#xff0c;希望能了解我方向的邻域前沿吧 粗读 第一篇 ATTEND TO WHO YOU ARE: SUPERVISING SELF-ATTENTION FOR KEYPOINT DETECTION AND INSTANCE-AWARE ASSOCIATION 翻译&#xff1a;https://editor.csdn.net/md?not_checkout1&spm1001.2014.3001.5352…...

3.1 模拟栈+表达式求值

模拟栈 题目链接 栈的数组模拟非常简单&#xff0c;不详细描述 设置一个指针指向栈顶第一个元素即可 STL中stack实现已经更新在STL_Stack #include<iostream> #include<string>using namespace std;const int N1e51; int m; string s; int stack[N]; int p;//指针…...

【Python语言基础】——Python 创建表

Python语言基础——Python 创建表 文章目录 Python语言基础——Python 创建表一、Python 创建表一、Python 创建表 创建表 如需在 MySQL 中创建表,请使用 “CREATE TABLE” 语句。 请确保在创建连接时定义数据库的名称。 实例 创建表 “customers”: import mysql.connector…...

外贸建站,为什么别人的询盘更多更精准?

大多企业进行外贸建站的目的就是想要获得更多的精准询盘&#xff0c;但是具体该如何做&#xff0c;大多企业都没有方向&#xff0c;要么就是在网上看各种不系统的文章学着操作&#xff0c;要么就找个建站公司做好网站就不管了&#xff0c;而最终结果都不甚理想。那么怎样才能让…...

Gateway集成Netty服务

Gateway和Netty都有盲区的感觉&#xff1b; 一、Netty简介 Netty是一个异步的&#xff0c;事件驱动的网络应用框架&#xff0c;用以快速开发高可靠、高性能的网络应用程序。 传输服务&#xff1a;提供网络传输能力的管理&#xff1b; 协议支持&#xff1a;支持常见的数据传输…...

SpringMVC控制层private方法中出现注入的service对象空指针异常

一、现象 SpringMVC中controller里的private接口中注入的service层的bean为null&#xff0c;而同一个controller中访问修饰符为public和protected的方法不会出现这样的问题。 controller中的方法被AOP进行了代理&#xff0c;普通Controller如果没有AOP&#xff0c;private方法…...

阿里云做网站可以免备案吗/seo综合查询工具有什么功能

2019独角兽企业重金招聘Python工程师标准>>> 一&#xff0c;支付清算体系的简介 支付清算体系是一个国家的金融基础设施&#xff0c;或说公共服务。我国由央行主管此事&#xff0c;目前大体维持“结算-清算”二级制的支付体系。通俗地讲&#xff0c;银行与商户、消费…...

wordpress做文字站/网站百度收录突然消失了

前言 继上一篇博客已经很长时间没更新了&#xff0c;这段时间空下来可以在继续跟大家共同学习共同进步了&#xff0c;在这之间的时间有很多朋友关注着&#xff0c;也加了我QQ问了一些问题&#xff0c;我最近也会把大家问的问题收集下来&#xff0c;然后做个集锦&#xff0c;相…...

wap网站如何做/他达拉非

假定组件文件为&#xff1a;./src/components/Books.vue//1、引入&#xff1a;import BScroll from better-scroll//2、使用<template><div class"wrapper"><ul><div v-show"downShow">加载中…………………………</div><…...

淘客没有网站难做/做一个公司网站要多少钱

MySQL 默认有个root用户&#xff0c;但是这个用户权限太大&#xff0c;一般只在管理数据库时候才用。如果在项目中要连接 MySQL 数据库&#xff0c;则建议新建一个权限较小的用户来连接。在 MySQL 命令行模式下输入如下命令可以为 MySQL 创建一个新用户&#xff1a;新用户创建完…...

网络广告营销的实现方式/seo具体优化流程

我们要为路由提供请求的URL和其他需要的GET及POST参数&#xff0c;随后路由需要根据这些数据来执行相应的代码&#xff08;这里“代码”对应整个应用的第三部分&#xff1a;一系列在接收到请求时真正工作的处理程序&#xff09;。 因此&#xff0c;我们需要查看HTTP请求&#…...

wordpress图片上传到哪里/搜索引擎谷歌入口

...