当前位置: 首页 > news >正文

从环境部署到开发实战:消息队列 RocketMQ

文章目录

  • 一、消息队列简介
    • 1.1 什么是消息队列
    • 1.2 常见消息队列对比
    • 1.3 RockectMQ 核心概念
    • 1.4 RockectMQ 工作机制 (★)
  • 二、RocketMQ 部署相关
    • 2.1 服务器单机部署
    • 2.2 管控台页面
  • 三、RocketMQ 的基本使用
    • 3.1 入门案例
    • 3.2 消息发送方式
      • 3.2.1 同步消息
      • 3.2.2 异步消息
      • 3.2.3 一次性消息
    • 3.3 消息消费方式
      • 3.3.1 集群模式
      • 3.3.2 广播模式
    • 3.4 顺序消息
    • 3.5 延迟消息
    • 3.6 消息过滤
      • 3.6.1 Tag 过滤
      • 3.6.2 SQL92 过滤
  • 四、SpringBoot 集成 RocketMQ
    • 4.1 入门案例
    • 4.2 消息发送方式
      • 4.2.1 同步消息
      • 4.2.2 异步消息
      • 4.2.3 一次性消息
    • 4.3 消息消费方式
      • 4.3.1 集群模式
      • 4.3.2 广播模式
    • 4.4 顺序消息
    • 4.5 延时消息
    • 4.6 消息过滤
      • 4.6.1 Tag 过滤
      • 4.6.2 SQL92 过滤


一、消息队列简介

1.1 什么是消息队列

  消息队列(MQ)也叫消息队列中间件,其主要通过消息的发送和接受来实现程序的异步解耦、削峰填谷以及数据分发但是 MQ 真正的目的是为了通讯。他屏蔽了复杂的通讯协议,像常用的 dubbo、http 协议都是同步的。这两种协议很难实现双端通讯(即:A调用B,B也可以主动调用A),而且不支持长链接。MQ 做的就是在这些协议上构建一个简单协议——生产者、消费者模型,MQ 带给我们的不是底层的通讯协议,而是更高层次的通讯模型。他定义了两个对象:发送数据的叫做生产者,接受消息的叫做消费者,我们可以无视底层的通讯协议,并且可以自己定义生产者消费者

参考:消息队列详解


1.2 常见消息队列对比

在这里插入图片描述


1.3 RockectMQ 核心概念

生产者 Producer负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 Broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

消费者 Consumer负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

名字服务 Name Server名称服务充当路由消息的提供者,生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个Namesrv 实例组成集群,但相互独立,没有信息交换。

代理服务器 Broker Server消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

消息主题 Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位

消息队列 MessageQueue:对于每个 Topic 都可以设置一定数量的消息队列用来进行数据的读取。

消息内容 Message消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。

标签 Tag为消息设置的标志,用于同一主题 Topic 下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。


1.4 RockectMQ 工作机制 (★)

  RockectMQ 有自己的注册中心,即 NameServer,连接命名服务后会拉取代理服务器的列表到本地缓存,生产者会通过负载均衡选出代理服务器的具体 IP,然后向选出的代理服务器发送消息,最终发送给消费者进行消费。如果发送的消息含有标签Tag,那么会在消费者消费时进行消息的过滤
在这里插入图片描述
  其中,每个 Topic 默认有 4 个 MessageQueue,即 4 个写和读队列。在消息中间件每个 topic 设置 4 个队列,主要是为了解决并发性能的问题。如果只有一个队列,为保证线程安全,必须得给队列进行写操作时上锁。设置 4 个队列也是由于大部分的服务器核心数都是 4 核的。

在这里插入图片描述


二、RocketMQ 部署相关

2.1 服务器单机部署

搜索资源:rocketmq-all-4.4.0-bin-release.zip

① 将压缩包上传服务器,把rocketmq-all-4.4.0-bin-release.zip 拷贝到 /usr/local/software
② 使用解压命令进行解压到 /usr/local 目录

unzip /usr/local/software/rocketmq-all-4.4.0-bin-release.zip -d /usr/local

③ 软件文件名重命名

mv  /usr/local/rocketmq-all-4.4.0-bin-release/  /usr/local/rocketmq-4.4/

④ 设置环境变量

vi /etc/profileexport JAVA_HOME=/usr/local/jdk1.8
export ROCKETMQ_HOME=/usr/local/rocketmq-4.4
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH

修改环境变量后,需要 source /etc/profile 使配置文件生效。

⑤ 修改脚本中的 JVM 相关参数和启动参数的配置

vi  /usr/local/rocketmq-4.4/bin/runbroker.shJAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
vi  /usr/local/rocketmq-4.4/bin/runserver.shJAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

⑥ 修改配置文件,设置 Nameserver 和 Broker-server 部署机器的IP地址。

 vi /usr/local/rocketmq-4.4/conf/broker.conf

在这里插入图片描述

注:如果是服务器本身可以不设置

⑦ 启动 NameServer

# 1.启动NameServer,& 代表后台输出
nohup sh mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log

⑧ 启动 Broker

#1.启动Broker
# nohup sh mqbroker -n 部署的IP地址:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf &
nohup sh mqbroker -n localhost:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf &#2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log 

⑨ 使用 jps命令查看是否开启成功,如果看到NamesrvStartupBrokerStartup这两个进程,则证明启动成功。

在这里插入图片描述

关闭 nameserver:sh mqshutdown namesrv
关闭 broker:sh mqshutdown broker

另外:服务器需要暂时关闭防火墙 systemctl stop firewalld,并可使用 firewall-cmd --state 查看防火墙状态。
具体可参考:Linux关闭防火墙命令


2.2 管控台页面

搜索资源:rocketmq-console-ng-1.0.1.jar

在 jar 包的文件夹下新建一个配置文件 application.properties,编辑管控台的端口和 NameServer 中心的 IP 地址,使用 java -jar rocketmq-console-ng-1.0.1.jar 启动即可。
在这里插入图片描述

在这里插入图片描述
访问 http://localhost:9999/#/管控台界面如下:
在这里插入图片描述

注:管控台要求 jdk1.8。


三、RocketMQ 的基本使用

在一个工程中创建两个模块,模拟生产者和消费者:
在这里插入图片描述

3.1 入门案例

添加 rocketmq 的 pom 依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>

生产者模块生产消息:

public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.101:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "helloTopic";// 发送消息for (int i = 0; i < 3; i++) {Message message = new Message(topic, ("RocketMQ普通消息:" + i).getBytes(Charset.defaultCharset()));// 发送完成之后会返回响应结果SendResult result = producer.send(message);System.out.println("发送状态:" + result.getSendStatus());}System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}

在这里插入图片描述

消费者模块消费消息:

public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");// 设置nameSever地址consumer.setNamesrvAddr("192.168.63.101:9876");// 设置订阅的主题consumer.subscribe("helloTopic", "*"); // * 消息不过滤// 设置消费模式,默认集群// consumer.setMessageModel(MessageModel.CLUSTERING);// 设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() { // MessageListenerConcurrently 代表多线程并发消费@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {String content = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:" + Thread.currentThread() + "消息内容:" + content);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 通知MQ消费正常// return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 通知MQ消费失败,消费者会重新消费}});// 启动的消费者consumer.start();}
}

在这里插入图片描述


3.2 消息发送方式

3.2.1 同步消息

可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。上面演示的案例,就是同步消息发送方式。
在这里插入图片描述

应用程序给消息中间件发送消息的时候,需要等待消息中间件将消息存储完毕后才响应回去,业务代码才能往下执行。

发送方式:SendResult result = producer.send(msg);

上面演示的案例就是同步发送。


3.2.2 异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应。

在这里插入图片描述

应用程序发送消息,消息中间件收到这个消息之后,直接给应用程序响应(此时消息并没有完全存储到磁盘),消息中间件继续存储消息,通过回调地址通知有应用程序存储的结果(成功或失败)。

发送方式:

 producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {...}@Overridepublic void onException(Throwable throwable) {...}
});
public class AsynchronousProducer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.100:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "helloTopic";// 发送消息Message message = new Message(topic, ("RocketMQ异步消息").getBytes(Charset.defaultCharset()));System.out.println("消息发送前");// 异步发送,需要传递异步回调消息producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息存储状态:" + sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送出现异常");}});System.out.println("消息发送完毕");TimeUnit.SECONDS.sleep(5); // 为了等回调消息,模拟程序睡眠5s后关闭资源// 关闭资源producer.shutdown();}
}

在这里插入图片描述


3.2.3 一次性消息

一次性消息主要用在不特别关心发送结果的场景,例如:日志发送。

在这里插入图片描述

发送方式:producer.sendOneway(message);

应用程序给消息中间件发送消息的时候,不需要知道消息是否在消息中间存储了,只管发就是了。

public class OneTimeProducer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.100:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "helloTopic";// 发送消息Message message = new Message(topic, ("RocketMQ一次性消息").getBytes(Charset.defaultCharset()));System.out.println("消息发送前");// 一次性消息producer.sendOneway(message);System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}

在这里插入图片描述


3.3 消息消费方式

3.3.1 集群模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。
在这里插入图片描述

由于内部会根据 index++ % queue.size() 的方式来决定消息进哪个 messageQueue,因此当多个机器做集群的时候,也可能会发生消息消费分配不均等情况。如下面 topic 中一共有 10 个消息:

在这里插入图片描述

入门案例默认就是集群的消费方式:
在这里插入图片描述


3.3.2 广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。

在这里插入图片描述
设置消费模式:consumer.setMessageModel(MessageModel.BROADCASTING);

在这里插入图片描述


3.4 顺序消息

从上文的消费结果来看,在集群状态下,消息的消费顺序是乱序的,但有些场景是要求消息的消费是有序的,这要怎么实现呢?我们考虑以下两个场景:
① 如果在消费者做集群的情况下,由于消息会分散在不同的队列中,因此消息不可保证顺序消费,如:第四个消息比第一个消息更早被消费。因此,可以考虑将消息全放在一个队列中

在这里插入图片描述
注:一个队列只会被一个消费者实例消费,一个消费者实例可以消费多个队列

② 我们设置消费者的监听模式的时候使用的是 MessageListenerConcurrently 即多线程并发消费的形式,那么当消息全存储在一个队列时,由于 CPU 执行权等问题,消费者实例中多线程会并发的进行消费,也不会保证顺序消费。

// 一个队列对应一个实例的多个线程
consumer.setMessageListener(new MessageListenerConcurrently() { // MessageListenerConcurrently 代表多线程并发消费@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {...}
});

在这里插入图片描述

因此,可以使用 MessageListenerOrderly 让一个队列只对应一个线程

// 从什么地方开始消费,队头开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 一个队列对应一个实例的一个线程
consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {...}
});

在这里插入图片描述

总结:如果想要实现顺序消费,在生产者的角度将消息存储在一个队列中,在消费者的角度就是将消息对应消费者实例里的一个线程


顺序消费案例,生产者代码如下:

public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.100:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();// 设置队列的选择器// 将需要顺序消费的消息存储到同一个队列中MessageQueueSelector selector = new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {System.out.println("队列的个数:" + list.size()); // 队列个数:4Long orderId = (Long)o; // 传入的参数int index = (int)(orderId % list.size());return list.get(index);}};// 发送消息for (OrderStep orderStep : orderSteps) {Message msg = new Message(topic, orderStep.toString().getBytes(Charset.defaultCharset()));// 指定消息选择器,传入的参数producer.send(msg, selector, orderStep.getOrderId()); // 将订单号传入选择器}System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}

消费者代码如下:

public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyProducerGroup");// 设置nameSever地址consumer.setNamesrvAddr("192.168.63.101:9876");// 设置订阅的主题consumer.subscribe("orderTopic", "*");// 从什么地方开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 一个队列对应的一个线程consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt msg : list) {System.out.println("当前线程ID:" + Thread.currentThread().getId() + ",队列ID:" + msg.getQueueId() + ",消息内容:"+ new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});// 启动的消费者consumer.start();}
}

模拟数据:

@Setter
@Getter
public class OrderStep {private long orderId;private String desc;@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}
}
public class OrderUtil {/*** 生成模拟订单数据*/public static List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(102L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(103L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(102L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(103L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(102L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(103L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}

控制台打印:

在这里插入图片描述
从控制台打印效果来看,102 订单全部存储在 ID 为 2 的队列当中,并且实现了顺序消费。


3.5 延迟消息

延时消息是 RocketMQ 延时发送给消费者消费的消息,典型应用场景如:订单超时未支付等。其不支持任意时间的延时,需要设置几个固定的延时等级,从 1s 到 2h 分别对应着等级 1 到 18。

等级123456789101112131415161718
延时1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h

生产者:

public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.101:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "helloTopic";// 发送消息SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Message message =new Message(topic, ("延时消息,发送时间:" + cusFormat.format(new Date())).getBytes(Charset.defaultCharset()));// 设置消息延时级别message.setDelayTimeLevel(3);// 发送完成之后会返回响应结果SendResult result = producer.send(message);System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}

消费者:

public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delayConsumerGroup");// 设置nameSever地址consumer.setNamesrvAddr("192.168.63.101:9876");// 设置订阅的主题consumer.subscribe("helloTopic", "*");// 设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");for (MessageExt msg : list) {System.out.println("消费时间:" + cusFormat.format(new Date()) + ",消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动的消费者consumer.start();}
}

控制台打印:
在这里插入图片描述


3.6 消息过滤

3.6.1 Tag 过滤

RocketMQ 的消息标签(Message Tag)是一种简单的路由机制,允许消费者根据标签来过滤并只消费感兴趣的消息。要实现消息标签的过滤,需要在发送消息时设置标签,并在消费者端配置标签过滤器。以下示例展示如何使用的标签过滤功能:

生产者设置消息标签并发送消息:

public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("tagProducesGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.101:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "tagFilterTopic";// 发送消息Message message1 = new Message(topic, "TagA", ("消息A").getBytes(Charset.defaultCharset()));Message message2 = new Message(topic, "TagB", ("消息B").getBytes(Charset.defaultCharset()));Message message3 = new Message(topic, "TagC", ("消息C").getBytes(Charset.defaultCharset()));producer.sendOneway(message1);producer.sendOneway(message2);producer.sendOneway(message3);System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}

消费者端配置标签过滤器:

public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup");// 设置nameSever地址consumer.setNamesrvAddr("192.168.63.101:9876");// 设置订阅的主题consumer.subscribe("tagFilterTopic", "TagA || TagC"); // 只消费 TagA 和 TagCconsumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println("消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动的消费者consumer.start();}
}

控制台打印:

在这里插入图片描述


3.6.2 SQL92 过滤

RocketMQ 的 SQL92 过滤器是一种基于消息属性的条件筛选机制,允许消费者只接收满足特定条件的消息。要使用 SQL92 过滤器,需要在消费者端设置过滤条件。以下是示例,演示如何在消费者端设置 SQL92 过滤器:

生产者设添加属性 putUserProperty(key, value)

public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("sqlProducesGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.100:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "sqlFilterTopic";// 发送消息Message message1 = new Message(topic, "TagA", ("消息A").getBytes(Charset.defaultCharset()));message1.putUserProperty("age", "22");message1.putUserProperty("weight", "45");Message message2 = new Message(topic, "TagB", ("消息B").getBytes(Charset.defaultCharset()));message2.putUserProperty("age", "30");message2.putUserProperty("weight", "50");Message message3 = new Message(topic, "TagC", ("消息C").getBytes(Charset.defaultCharset()));message3.putUserProperty("age", "15");message3.putUserProperty("weight", "48");producer.sendOneway(message1);producer.sendOneway(message2);producer.sendOneway(message3);System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}

消费者设置过滤条件:

public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterConsumerGroup");// 设置nameSever地址consumer.setNamesrvAddr("192.168.63.101:9876");// 设置订阅的主题consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("age<25 and weight<47"));// 一个队列对应的一个线程consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println("消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动的消费者consumer.start();}
}

在这里插入图片描述

注意:
① 过滤条件支持以下形式:

数值比较,如:>,>=,<,<=,BETWEEN,=
字符比较,如:=,<>,IN
IS NULL 或者 IS NOT NULL
逻辑符号 AND,OR,NOT

常量支持类型为:
数值,如:**123,3.1415;
字符,如:‘abc’,必须用单引号包裹起来
NULL,特殊的常量
布尔值,TRUE 或 FALSE

② 在使用 SQL 过滤的时候, 需要修改配置文件 broker.conf,添加参数enablePropertyFilter=true,重启 broker 代理服务器。

vi /usr/local/rocketmq-4.4/conf/broker.conf

在这里插入图片描述


四、SpringBoot 集成 RocketMQ

4.1 入门案例

1、添加 pom 依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version>
</dependency>

2、生产者模块添加配置

rocketmq:name-server: 192.168.63.101:9876producer:group: my-group

3、消费者模块添加配置

rocketmq:name-server: 192.168.63.101:9876

4、生产者模块生产消息

@SpringBootTest
public class RocketMQTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void sendMsg() {Message<String> msg = MessageBuilder.withPayload("发送消息").build();rocketMQTemplate.send("helloTopicBoot", msg);}
}

注意:① Message 对象是 Spring 框架提供的对象 import org.springframework.messaging.Message;
   ② rocketMQTemplate.send(destination, message) 方法是同步的发送方式。

5、消费者模块消费消息

@Component
// 消费者名字叫 helloConsumerGroup,消费的生产组叫 helloTopicBoot
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot", messageModel = MessageModel.BROADCASTING)
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("收到消息:" + new String(messageExt.getBody(), Charset.defaultCharset()));}
}

4.2 消息发送方式

下文不阐述具体的发送细节,细节参考上文。

4.2.1 同步消息

@Test
public void sendSYNMsg() throws InterruptedException {Message<String> msg = MessageBuilder.withPayload("发送同步消息").build();rocketMQTemplate.syncSend("helloTopicBoot", msg);
}

4.2.2 异步消息

@Test
public void sendASYNMsg() throws InterruptedException {Message<String> msg = MessageBuilder.withPayload("发送异步消息").build();rocketMQTemplate.asyncSend("helloTopicBoot", msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送状态:" + sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送失败");}});TimeUnit.SECONDS.sleep(5);
}

4.2.3 一次性消息

@Test
public void sendOnewayMsg() {Message<String> msg = MessageBuilder.withPayload("发送一次性消息").build();rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
}

4.3 消息消费方式

这里模拟生产者发送一次性消息10次:

@Test
public void sendOnewayMsgLoop() {for (int i = 0; i < 10; i++) {Message<String> msg = MessageBuilder.withPayload("发送一次性消息:" + i).build();rocketMQTemplate.sendOneWay("helloTopicBoot", msg);}
}

4.3.1 集群模式

默认情况下,消费者采用负载均衡方式消费消息,即采用集群模式,也可以在 @RocketMQMessageListener 注解中设置 messageModel 属性来改变消费模式。

@Component
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot",messageModel = MessageModel.CLUSTERING) // 设置消费模式
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("收到消息:" + new String(messageExt.getBody(), Charset.defaultCharset()));}
}

在这里插入图片描述

控制台打印:

在这里插入图片描述


4.3.2 广播模式

设置广播模式:messageModel = MessageModel.BROADCASTING

@Component
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot",messageModel = MessageModel.BROADCASTING) // 设置消费模式
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {...}
}

控制台打印:

在这里插入图片描述


4.4 顺序消息

生产者设置队列选择器,需要将顺序消息放在同一个队列:

@Test
public void sendOrderlyMsg() {// 设置队列的选择器// 将需要顺序消费的消息存储到同一个队列中rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {String orderIdStr = (String)o; // 传入参数Long orderId = Long.valueOf(orderIdStr);int index = (int)(orderId % list.size());return list.get(index);}});List<OrderStep> orderSteps = OrderUtil.buildOrders();// 发送消息for (OrderStep step : orderSteps) {Message<String> msg = MessageBuilder.withPayload(step.toString()).build();rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot", msg, String.valueOf(step.getOrderId()));}
}

消费者默认一个队列是线程并发消费,可以通过设置 consumeMode = ConsumeMode.ORDERLY,将一个消息队列对应消费者的一个线程,以实现顺序消费:

@Component
@RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot", topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY) // 设置一个队列对应一个线程
public class OrderlyTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("当前线程:" + Thread.currentThread().getId() + ",队列ID:" + messageExt.getQueueId() + ",消息内容:"+ new String(messageExt.getBody(), Charset.defaultCharset()));}
}

在这里插入图片描述

控制台打印:
在这里插入图片描述


4.5 延时消息

@Test
public void sendDelayMsg() {SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Message<String> msg = MessageBuilder.withPayload("发送延时消息,发送时间:" + cusFormat.format(new Date())).build();// 设置延时等级3,这个消息将在 10s 之后发送(详看delayTimeLevel),消息在任务队列里存储,10s 后发送// 3000 代表同步等待 3s,若超过 3s 消息队列都没有响应,自动断开链接rocketMQTemplate.syncSend("helloTopicBoot", msg, 3000, 3);
}

在这里插入图片描述


4.6 消息过滤

4.6.1 Tag 过滤

生产者生产消息,Topic 和 Tag 以 “:” 分割( “:” 前后不能有空格)

@Test
public void sendTagFilterMsg() {Message<String> msg1 = MessageBuilder.withPayload("消息A").build();rocketMQTemplate.send("tagFilterBoot:TagA", msg1);Message<String> msg2 = MessageBuilder.withPayload("消息B").build();rocketMQTemplate.send("tagFilterBoot:TagB", msg2);Message<String> msg3 = MessageBuilder.withPayload("消息C").build();rocketMQTemplate.send("tagFilterBoot:TagC", msg3);
}

消费者设置过滤条件:

@Component
@RocketMQMessageListener(consumerGroup = "tagFilterConsumerBoot", topic = "tagFilterBoot", selectorExpression = "TagA || TagC") // selectorExpression 过滤条件
public class TagFilterTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("消息内容:" + new String(messageExt.getBody(), Charset.defaultCharset()));}
}

控制台打印:
在这里插入图片描述


4.6.2 SQL92 过滤

使用 setHeader(String headerName, Object headerValue) 方法设置过滤条件:

@Test
public void sendSQL92FilterMsg() {Message<String> msg1 = MessageBuilder.withPayload("美女A").setHeader("age", 22).setHeader("weight", 90).build();rocketMQTemplate.send("SQL92FilterBoot", msg1);Message<String> msg2 = MessageBuilder.withPayload("美女B").setHeader("age", 20).setHeader("weight", 100).build();rocketMQTemplate.send("SQL92FilterBoot", msg2);Message<String> msg3 = MessageBuilder.withPayload("美女C").setHeader("age", 25).setHeader("weight", 120).build();rocketMQTemplate.send("SQL92FilterBoot", msg3);
}

消费者设置 selectorTypeselectorExpression

@Component
@RocketMQMessageListener(consumerGroup = "SQL92FilterConsumerBoot", topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92, selectorExpression = "age<25 and weight>90") // 设置
public class Sql92FilterTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("消息内容:" + new String(messageExt.getBody(), Charset.defaultCharset()));}
}

控制台打印:
在这里插入图片描述

注:在使用 SQL 过滤的时候, 需要修改配置文件 broker.conf,具体参考 3.6.2


文章参考:Java微服务商城高并发秒杀项目实战|Spring Cloud Alibaba真实项目实战+商城双11秒杀+高并发+消息+支付+分布式事物Seata

相关文章:

从环境部署到开发实战:消息队列 RocketMQ

文章目录 一、消息队列简介1.1 什么是消息队列1.2 常见消息队列对比1.3 RockectMQ 核心概念1.4 RockectMQ 工作机制 &#xff08;★&#xff09; 二、RocketMQ 部署相关2.1 服务器单机部署2.2 管控台页面 三、RocketMQ 的基本使用3.1 入门案例3.2 消息发送方式3.2.1 同步消息3.…...

【机器学习(九)】分类和回归任务-多层感知机(Multilayer Perceptron,MLP)算法-Sentosa_DSML社区版

文章目录 一、算法概念二、算法原理&#xff08;一&#xff09;感知机&#xff08;二&#xff09;多层感知机1、隐藏层2、激活函数sigma函数tanh函数ReLU函数 3、反向传播算法 三、算法优缺点&#xff08;一&#xff09;优点&#xff08;二&#xff09;缺点 四、MLP分类任务实现…...

渗透测试-文件上传绕过思路

文件上传绕过思路 引言 分享一些文件上传绕过的思路&#xff0c;下文内容多包含实战图片&#xff0c;所以打码会非常严重&#xff0c;可多看文字表达&#xff1b;本文仅用于交流学习&#xff0c; 由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失&#x…...

等保测评中的密码学应用分析

等保测评中密码学应用的分析 等保测评&#xff08;信息安全等级保护测评&#xff09;是中国信息安全领域的一项重要活动&#xff0c;旨在评估信息系统的安全性&#xff0c;并根据评估结果给予相应的安全等级。在等保测评中&#xff0c;密码学应用分析是评估信息系统安全性的关键…...

LCR 007. 三数之和

文章目录 1.题目2.思路3.代码 1.题目 LCR 007. 三数之和 给定一个包含 n 个整数的数组 nums&#xff0c;判断 nums 中是否存在三个元素 a &#xff0c;b &#xff0c;c *&#xff0c;*使得 a b c 0 &#xff1f;请找出所有和为 0 且 不重复 的三元组。 示例 1&#xff1a…...

【入门01】arcgis api 4.x 创建地图、添加图层、添加指北针、比例尺、图例、卷帘、图层控制、家控件(附完整源码)

1.效果 2.代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title></title><link rel"s…...

STL迭代器标签

STL&#xff08;标准模板库&#xff09;迭代器标签是用来标识迭代器类型的分类机制。这些标签有助于确定迭代器的特性&#xff0c;比如它是否可以进行随机访问、是否支持修改元素等。主要的迭代器标签包括&#xff1a; Input Iterator&#xff1a;只读迭代器&#xff0c;可以顺…...

容器学习之SparseArray源码解析

1、SparseArray是android sdk 提供集合类&#xff0c;主要用来替换key 为int类型&#xff0c;value为Object类型的Hashmap 2、SparseArray和HashMap相比优缺点&#xff1a; 优点&#xff1a; 1、SparseArray存在一个int[]keys, 因此避免自动装箱 2、SparseArray扩容时只需要数…...

信创改造技术介绍

目录 服务发现和注册 Sentinel 核心功能 典型应用场景 gateway 网关的主要功能 Spring Cloud Gateway Kong Kong 的主要功能 Kong 的架构&#xff1a; Kong 的使用场景&#xff1a; Kong 的部署模式&#xff1a; 优势 Gateway与Sentinel区别 Gateway Sentinel …...

【可见的点——欧拉函数】

在数论&#xff0c;对正整数n&#xff0c;欧拉函数是小于或等于n的正整数中与n互质的数的数目&#xff08;不包括1&#xff09; 题目 思路 有三个点比较特殊&#xff08;因为一来这三个点一定可见&#xff0c;同时也无法用gcd 1判断&#xff09;&#xff1a;&#xff08;0&am…...

Maven重点学习笔记(包入门 2万字)

Maven依赖管理项目构建工具 尚硅谷 5h 2023最新版 一&#xff0c;Maven简介 1.为什么学习Maven 1.1, Maven是一个依赖管理工具 1️⃣ jar包的规模 随着我们使用越来越多的框架&#xff0c;或者框架封装程度越来越高&#xff0c;项目中使用的jar包也越来越多。项目中&…...

1.分页查询(后端)—— Vue3 + SpringCloud 5 + MyBatisPlus + MySQL 项目系列(基于 Zulu 11)

本手册是基于 Vue3 SpringCloud5 MyBatisPlus MySQL 的项目结构和代码实现&#xff0c;旨在作为一个教学案例进行讲解。为了使案例更具普适性&#xff0c;文档中的公司名称、实体类、表名以及字段名称等敏感信息均已脱敏。 项目结构概述 项目采用标准的分层架构&#xff0…...

机器学习与深度学习的区别:深入理解与应用场景

在人工智能&#xff08;AI&#xff09;的广阔领域中&#xff0c;机器学习和深度学习是两个核心概念&#xff0c;它们虽然紧密相关&#xff0c;但在定义、技术、数据处理能力、应用场景等方面存在显著差异。本文将深入探讨这些区别&#xff0c;帮助读者更好地理解并选择合适的技…...

C++学习笔记(45)

322、循环队列、信号量、生产/消费者模型的源代码 一、demo1.cpp // demo1.cpp&#xff0c;本程序演示循环队列的使用。 #include "_public.h" int main() { using ElemTypeint; squeue<ElemType,5> QQ; ElemType ee; // 创建一个数据元素。 cout << &qu…...

【2】图像视频的加载和显示

文章目录 【2】图像视频的加载和显示一、代码在哪写二、创建和显示窗口&#xff08;一&#xff09;导入OpenCV的包cv2&#xff08;二&#xff09;创建窗口&#xff08;三&#xff09;更改窗口大小 & 显示窗口&#xff08;四&#xff09;等待用户输入补充&#xff1a;ord()函…...

1. BOOT.BIN 2. 固化 3. 启动 4. SDK 5. 文件

在进行FPGA的开发与固化过程中&#xff0c;生成BOOT.BIN文件是一个重要的步骤。BOOT.BIN文件通常包含了系统启动所需的不同文件&#xff0c;以下是如何创建和使用该文件的详细说明。 ### 生成BOOT.BIN文件的步骤 1. **方法一&#xff1a;通过项目构建** - 右键单击项目&#xf…...

vue按钮接收键盘回车事件

了解了&#xff01;如果您想让 Submit 按钮在按下回车键时被触发&#xff0c;可以在 Vue 组件中监听全局的键盘事件。以下是实现这一功能的示例&#xff1a; 示例代码 <template><div><inputtype"text"v-model"inputValue"placeholder&qu…...

腾讯云点播及声音上传

文章目录 1、开通腾讯云点播2、获取腾讯云API密钥3、完成声音上传3.1、引入依赖3.2、参考&#xff1a;接入点地域3.3、参考&#xff1a;任务流设置3.4、首先修改配置&#xff1a;3.4.1、 3.5、TrackInfoApiController --》 uploadTrack()3.6、VodServiceImpl --》 uploadTrack(…...

如何查看服务器是否有raid阵列卡以及raid类型

要查看服务器是否配置了RAID阵列卡以及RAID的类型&#xff0c;可以使用多种方法。以下是一些常用的命令和步骤&#xff1a; 1. 使用 lspci 命令 这个命令可以列出所有的PCI设备&#xff0c;包括RAID控制器。 lspci | grep -i raid 如果输出中有RAID相关的设备信息&#xff0c;那…...

工博会动态 | 来8.1馆 看桥田如何玩转全场

北京时间2024年9月24日&#xff0c;中国国际工业博览会开幕&#xff0c;桥田智能&#xff08;8.1馆A001&#xff09;推出心意三重奏&#xff0c;有没有小伙伴们发现呢&#xff1f;现在&#xff0c;让我们一起city walk下&#xff01; 桥田显眼包横空出道 有小伙伴已经发现&…...

零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?

一、核心优势&#xff1a;专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发&#xff0c;是一款收费低廉但功能全面的Windows NAS工具&#xff0c;主打“无学习成本部署” 。与其他NAS软件相比&#xff0c;其优势在于&#xff1a; 无需硬件改造&#xff1a;将任意W…...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

基于服务器使用 apt 安装、配置 Nginx

&#x1f9fe; 一、查看可安装的 Nginx 版本 首先&#xff0c;你可以运行以下命令查看可用版本&#xff1a; apt-cache madison nginx-core输出示例&#xff1a; nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/

使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题&#xff1a;docker pull 失败 网络不同&#xff0c;需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...

vue3+vite项目中使用.env文件环境变量方法

vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量&#xff0c;这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机

这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机&#xff0c;因为在使用过程中发现 Airsim 对外部监控相机的描述模糊&#xff0c;而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置&#xff0c;最后在源码示例中找到了&#xff0c;所以感…...