从环境部署到开发实战:消息队列 RocketMQ
文章目录
- 一、消息队列简介
- 1.1 什么是消息队列
- 1.2 常见消息队列对比
- 1.3 RockectMQ 核心概念
- 1.4 RockectMQ 工作机制 (★)
- 二、RocketMQ 部署相关
- 2.1 服务器单机部署
- 2.2 管控台页面
- 三、RocketMQ 的基本使用
- 3.1 入门案例
- 3.2 消息发送方式
- 3.2.1 同步消息
- 3.2.2 异步消息
- 3.2.3 一次性消息
- 3.3 消息消费方式
- 3.3.1 集群模式
- 3.3.2 广播模式
- 3.4 顺序消息
- 3.5 延迟消息
- 3.6 消息过滤
- 3.6.1 Tag 过滤
- 3.6.2 SQL92 过滤
- 四、SpringBoot 集成 RocketMQ
- 4.1 入门案例
- 4.2 消息发送方式
- 4.2.1 同步消息
- 4.2.2 异步消息
- 4.2.3 一次性消息
- 4.3 消息消费方式
- 4.3.1 集群模式
- 4.3.2 广播模式
- 4.4 顺序消息
- 4.5 延时消息
- 4.6 消息过滤
- 4.6.1 Tag 过滤
- 4.6.2 SQL92 过滤
一、消息队列简介
1.1 什么是消息队列
消息队列(MQ)也叫消息队列中间件,其主要通过消息的发送和接受来实现程序的异步解耦、削峰填谷以及数据分发,但是 MQ 真正的目的是为了通讯。他屏蔽了复杂的通讯协议,像常用的 dubbo、http 协议都是同步的。这两种协议很难实现双端通讯(即:A调用B,B也可以主动调用A),而且不支持长链接。MQ 做的就是在这些协议上构建一个简单协议——生产者、消费者模型,MQ 带给我们的不是底层的通讯协议,而是更高层次的通讯模型。他定义了两个对象:发送数据的叫做生产者,接受消息的叫做消费者,我们可以无视底层的通讯协议,并且可以自己定义生产者消费者。
参考:消息队列详解
1.2 常见消息队列对比
1.3 RockectMQ 核心概念
生产者 Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 Broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。
消费者 Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
名字服务 Name Server:名称服务充当路由消息的提供者,生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个Namesrv 实例组成集群,但相互独立,没有信息交换。
代理服务器 Broker Server:消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
消息主题 Topic:表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。
消息队列 MessageQueue:对于每个 Topic 都可以设置一定数量的消息队列用来进行数据的读取。
消息内容 Message:消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。
标签 Tag:为消息设置的标志,用于同一主题 Topic 下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。
1.4 RockectMQ 工作机制 (★)
RockectMQ 有自己的注册中心,即 NameServer,连接命名服务后会拉取代理服务器的列表到本地缓存,生产者会通过负载均衡选出代理服务器的具体 IP,然后向选出的代理服务器发送消息,最终发送给消费者进行消费。如果发送的消息含有标签Tag,那么会在消费者消费时进行消息的过滤。
其中,每个 Topic 默认有 4 个 MessageQueue,即 4 个写和读队列。在消息中间件每个 topic 设置 4 个队列,主要是为了解决并发性能的问题。如果只有一个队列,为保证线程安全,必须得给队列进行写操作时上锁。设置 4 个队列也是由于大部分的服务器核心数都是 4 核的。
二、RocketMQ 部署相关
2.1 服务器单机部署
搜索资源:rocketmq-all-4.4.0-bin-release.zip
① 将压缩包上传服务器,把rocketmq-all-4.4.0-bin-release.zip
拷贝到 /usr/local/software
② 使用解压命令进行解压到 /usr/local
目录
unzip /usr/local/software/rocketmq-all-4.4.0-bin-release.zip -d /usr/local
③ 软件文件名重命名
mv /usr/local/rocketmq-all-4.4.0-bin-release/ /usr/local/rocketmq-4.4/
④ 设置环境变量
vi /etc/profileexport JAVA_HOME=/usr/local/jdk1.8
export ROCKETMQ_HOME=/usr/local/rocketmq-4.4
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
修改环境变量后,需要 source /etc/profile
使配置文件生效。
⑤ 修改脚本中的 JVM 相关参数和启动参数的配置
vi /usr/local/rocketmq-4.4/bin/runbroker.shJAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
vi /usr/local/rocketmq-4.4/bin/runserver.shJAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
⑥ 修改配置文件,设置 Nameserver 和 Broker-server 部署机器的IP地址。
vi /usr/local/rocketmq-4.4/conf/broker.conf
注:如果是服务器本身可以不设置
⑦ 启动 NameServer
# 1.启动NameServer,& 代表后台输出
nohup sh mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
⑧ 启动 Broker
#1.启动Broker
# nohup sh mqbroker -n 部署的IP地址:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf &
nohup sh mqbroker -n localhost:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf .查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
⑨ 使用 jps
命令查看是否开启成功,如果看到NamesrvStartup
和BrokerStartup
这两个进程,则证明启动成功。
关闭 nameserver:sh mqshutdown namesrv
关闭 broker:sh mqshutdown broker
另外:服务器需要暂时关闭防火墙 systemctl stop firewalld
,并可使用 firewall-cmd --state
查看防火墙状态。
具体可参考:Linux关闭防火墙命令
2.2 管控台页面
搜索资源:rocketmq-console-ng-1.0.1.jar
在 jar 包的文件夹下新建一个配置文件 application.properties,编辑管控台的端口和 NameServer 中心的 IP 地址,使用 java -jar rocketmq-console-ng-1.0.1.jar
启动即可。
访问 http://localhost:9999/#/
管控台界面如下:
注:管控台要求 jdk1.8。
三、RocketMQ 的基本使用
在一个工程中创建两个模块,模拟生产者和消费者:
3.1 入门案例
添加 rocketmq 的 pom 依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>
生产者模块生产消息:
public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.101:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "helloTopic";// 发送消息for (int i = 0; i < 3; i++) {Message message = new Message(topic, ("RocketMQ普通消息:" + i).getBytes(Charset.defaultCharset()));// 发送完成之后会返回响应结果SendResult result = producer.send(message);System.out.println("发送状态:" + result.getSendStatus());}System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}
消费者模块消费消息:
public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");// 设置nameSever地址consumer.setNamesrvAddr("192.168.63.101:9876");// 设置订阅的主题consumer.subscribe("helloTopic", "*"); // * 消息不过滤// 设置消费模式,默认集群// consumer.setMessageModel(MessageModel.CLUSTERING);// 设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() { // MessageListenerConcurrently 代表多线程并发消费@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {String content = new String(msg.getBody(), Charset.defaultCharset());System.out.println("线程:" + Thread.currentThread() + "消息内容:" + content);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 通知MQ消费正常// return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 通知MQ消费失败,消费者会重新消费}});// 启动的消费者consumer.start();}
}
3.2 消息发送方式
3.2.1 同步消息
可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。上面演示的案例,就是同步消息发送方式。
应用程序给消息中间件发送消息的时候,需要等待消息中间件将消息存储完毕后才响应回去,业务代码才能往下执行。
发送方式:SendResult result = producer.send(msg);
上面演示的案例就是同步发送。
3.2.2 异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应。
应用程序发送消息,消息中间件收到这个消息之后,直接给应用程序响应(此时消息并没有完全存储到磁盘),消息中间件继续存储消息,通过回调地址通知有应用程序存储的结果(成功或失败)。
发送方式:
producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {...}@Overridepublic void onException(Throwable throwable) {...}
});
public class AsynchronousProducer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.100:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "helloTopic";// 发送消息Message message = new Message(topic, ("RocketMQ异步消息").getBytes(Charset.defaultCharset()));System.out.println("消息发送前");// 异步发送,需要传递异步回调消息producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息存储状态:" + sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送出现异常");}});System.out.println("消息发送完毕");TimeUnit.SECONDS.sleep(5); // 为了等回调消息,模拟程序睡眠5s后关闭资源// 关闭资源producer.shutdown();}
}
3.2.3 一次性消息
一次性消息主要用在不特别关心发送结果的场景,例如:日志发送。
发送方式:producer.sendOneway(message);
应用程序给消息中间件发送消息的时候,不需要知道消息是否在消息中间存储了,只管发就是了。
public class OneTimeProducer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.100:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "helloTopic";// 发送消息Message message = new Message(topic, ("RocketMQ一次性消息").getBytes(Charset.defaultCharset()));System.out.println("消息发送前");// 一次性消息producer.sendOneway(message);System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}
3.3 消息消费方式
3.3.1 集群模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。
由于内部会根据 index++ % queue.size()
的方式来决定消息进哪个 messageQueue,因此当多个机器做集群的时候,也可能会发生消息消费分配不均等情况。如下面 topic 中一共有 10 个消息:
入门案例默认就是集群的消费方式:
3.3.2 广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。
设置消费模式:consumer.setMessageModel(MessageModel.BROADCASTING);
3.4 顺序消息
从上文的消费结果来看,在集群状态下,消息的消费顺序是乱序的,但有些场景是要求消息的消费是有序的,这要怎么实现呢?我们考虑以下两个场景:
① 如果在消费者做集群的情况下,由于消息会分散在不同的队列中,因此消息不可保证顺序消费,如:第四个消息比第一个消息更早被消费。因此,可以考虑将消息全放在一个队列中。
注:一个队列只会被一个消费者实例消费,一个消费者实例可以消费多个队列。
② 我们设置消费者的监听模式的时候使用的是 MessageListenerConcurrently 即多线程并发消费的形式,那么当消息全存储在一个队列时,由于 CPU 执行权等问题,消费者实例中多线程会并发的进行消费,也不会保证顺序消费。
// 一个队列对应一个实例的多个线程
consumer.setMessageListener(new MessageListenerConcurrently() { // MessageListenerConcurrently 代表多线程并发消费@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {...}
});
因此,可以使用 MessageListenerOrderly 让一个队列只对应一个线程。
// 从什么地方开始消费,队头开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 一个队列对应一个实例的一个线程
consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {...}
});
总结:如果想要实现顺序消费,在生产者的角度将消息存储在一个队列中,在消费者的角度就是将消息对应消费者实例里的一个线程。
顺序消费案例,生产者代码如下:
public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.100:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "orderTopic";List<OrderStep> orderSteps = OrderUtil.buildOrders();// 设置队列的选择器// 将需要顺序消费的消息存储到同一个队列中MessageQueueSelector selector = new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {System.out.println("队列的个数:" + list.size()); // 队列个数:4Long orderId = (Long)o; // 传入的参数int index = (int)(orderId % list.size());return list.get(index);}};// 发送消息for (OrderStep orderStep : orderSteps) {Message msg = new Message(topic, orderStep.toString().getBytes(Charset.defaultCharset()));// 指定消息选择器,传入的参数producer.send(msg, selector, orderStep.getOrderId()); // 将订单号传入选择器}System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}
消费者代码如下:
public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyProducerGroup");// 设置nameSever地址consumer.setNamesrvAddr("192.168.63.101:9876");// 设置订阅的主题consumer.subscribe("orderTopic", "*");// 从什么地方开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 一个队列对应的一个线程consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt msg : list) {System.out.println("当前线程ID:" + Thread.currentThread().getId() + ",队列ID:" + msg.getQueueId() + ",消息内容:"+ new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});// 启动的消费者consumer.start();}
}
模拟数据:
@Setter
@Getter
public class OrderStep {private long orderId;private String desc;@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}
}
public class OrderUtil {/*** 生成模拟订单数据*/public static List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(102L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(103L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(102L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(103L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(102L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(103L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}
控制台打印:
从控制台打印效果来看,102 订单全部存储在 ID 为 2 的队列当中,并且实现了顺序消费。
3.5 延迟消息
延时消息是 RocketMQ 延时发送给消费者消费的消息,典型应用场景如:订单超时未支付等。其不支持任意时间的延时,需要设置几个固定的延时等级,从 1s 到 2h 分别对应着等级 1 到 18。
等级 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
延时 | 1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5m | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
生产者:
public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("helloGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.101:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "helloTopic";// 发送消息SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Message message =new Message(topic, ("延时消息,发送时间:" + cusFormat.format(new Date())).getBytes(Charset.defaultCharset()));// 设置消息延时级别message.setDelayTimeLevel(3);// 发送完成之后会返回响应结果SendResult result = producer.send(message);System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}
消费者:
public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delayConsumerGroup");// 设置nameSever地址consumer.setNamesrvAddr("192.168.63.101:9876");// 设置订阅的主题consumer.subscribe("helloTopic", "*");// 设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");for (MessageExt msg : list) {System.out.println("消费时间:" + cusFormat.format(new Date()) + ",消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动的消费者consumer.start();}
}
控制台打印:
3.6 消息过滤
3.6.1 Tag 过滤
RocketMQ 的消息标签(Message Tag)是一种简单的路由机制,允许消费者根据标签来过滤并只消费感兴趣的消息。要实现消息标签的过滤,需要在发送消息时设置标签,并在消费者端配置标签过滤器。以下示例展示如何使用的标签过滤功能:
生产者设置消息标签并发送消息:
public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("tagProducesGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.101:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "tagFilterTopic";// 发送消息Message message1 = new Message(topic, "TagA", ("消息A").getBytes(Charset.defaultCharset()));Message message2 = new Message(topic, "TagB", ("消息B").getBytes(Charset.defaultCharset()));Message message3 = new Message(topic, "TagC", ("消息C").getBytes(Charset.defaultCharset()));producer.sendOneway(message1);producer.sendOneway(message2);producer.sendOneway(message3);System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}
消费者端配置标签过滤器:
public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup");// 设置nameSever地址consumer.setNamesrvAddr("192.168.63.101:9876");// 设置订阅的主题consumer.subscribe("tagFilterTopic", "TagA || TagC"); // 只消费 TagA 和 TagCconsumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println("消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动的消费者consumer.start();}
}
控制台打印:
3.6.2 SQL92 过滤
RocketMQ 的 SQL92 过滤器是一种基于消息属性的条件筛选机制,允许消费者只接收满足特定条件的消息。要使用 SQL92 过滤器,需要在消费者端设置过滤条件。以下是示例,演示如何在消费者端设置 SQL92 过滤器:
生产者设添加属性 putUserProperty(key, value)
public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer = new DefaultMQProducer("sqlProducesGroup");// 连接nameSeverproducer.setNamesrvAddr("192.168.63.100:9876");// 启动生产者producer.start();// 设置消息发送的目的地String topic = "sqlFilterTopic";// 发送消息Message message1 = new Message(topic, "TagA", ("消息A").getBytes(Charset.defaultCharset()));message1.putUserProperty("age", "22");message1.putUserProperty("weight", "45");Message message2 = new Message(topic, "TagB", ("消息B").getBytes(Charset.defaultCharset()));message2.putUserProperty("age", "30");message2.putUserProperty("weight", "50");Message message3 = new Message(topic, "TagC", ("消息C").getBytes(Charset.defaultCharset()));message3.putUserProperty("age", "15");message3.putUserProperty("weight", "48");producer.sendOneway(message1);producer.sendOneway(message2);producer.sendOneway(message3);System.out.println("消息发送完毕");// 关闭资源producer.shutdown();}
}
消费者设置过滤条件:
public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterConsumerGroup");// 设置nameSever地址consumer.setNamesrvAddr("192.168.63.101:9876");// 设置订阅的主题consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("age<25 and weight<47"));// 一个队列对应的一个线程consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println("消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动的消费者consumer.start();}
}
注意:
① 过滤条件支持以下形式:
数值比较,如:>,>=,<,<=,BETWEEN,=
字符比较,如:=,<>,IN
IS NULL 或者 IS NOT NULL
逻辑符号 AND,OR,NOT
常量支持类型为:
数值,如:**123,3.1415;
字符,如:‘abc’,必须用单引号包裹起来
NULL,特殊的常量
布尔值,TRUE 或 FALSE
② 在使用 SQL 过滤的时候, 需要修改配置文件 broker.conf,添加参数enablePropertyFilter=true,重启 broker 代理服务器。
vi /usr/local/rocketmq-4.4/conf/broker.conf
四、SpringBoot 集成 RocketMQ
4.1 入门案例
1、添加 pom 依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version>
</dependency>
2、生产者模块添加配置
rocketmq:name-server: 192.168.63.101:9876producer:group: my-group
3、消费者模块添加配置
rocketmq:name-server: 192.168.63.101:9876
4、生产者模块生产消息
@SpringBootTest
public class RocketMQTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void sendMsg() {Message<String> msg = MessageBuilder.withPayload("发送消息").build();rocketMQTemplate.send("helloTopicBoot", msg);}
}
注意:① Message 对象是 Spring 框架提供的对象 import org.springframework.messaging.Message;
② rocketMQTemplate.send(destination, message)
方法是同步的发送方式。
5、消费者模块消费消息
@Component
// 消费者名字叫 helloConsumerGroup,消费的生产组叫 helloTopicBoot
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot", messageModel = MessageModel.BROADCASTING)
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("收到消息:" + new String(messageExt.getBody(), Charset.defaultCharset()));}
}
4.2 消息发送方式
下文不阐述具体的发送细节,细节参考上文。
4.2.1 同步消息
@Test
public void sendSYNMsg() throws InterruptedException {Message<String> msg = MessageBuilder.withPayload("发送同步消息").build();rocketMQTemplate.syncSend("helloTopicBoot", msg);
}
4.2.2 异步消息
@Test
public void sendASYNMsg() throws InterruptedException {Message<String> msg = MessageBuilder.withPayload("发送异步消息").build();rocketMQTemplate.asyncSend("helloTopicBoot", msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送状态:" + sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送失败");}});TimeUnit.SECONDS.sleep(5);
}
4.2.3 一次性消息
@Test
public void sendOnewayMsg() {Message<String> msg = MessageBuilder.withPayload("发送一次性消息").build();rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
}
4.3 消息消费方式
这里模拟生产者发送一次性消息10次:
@Test
public void sendOnewayMsgLoop() {for (int i = 0; i < 10; i++) {Message<String> msg = MessageBuilder.withPayload("发送一次性消息:" + i).build();rocketMQTemplate.sendOneWay("helloTopicBoot", msg);}
}
4.3.1 集群模式
默认情况下,消费者采用负载均衡方式消费消息,即采用集群模式,也可以在 @RocketMQMessageListener
注解中设置 messageModel
属性来改变消费模式。
@Component
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot",messageModel = MessageModel.CLUSTERING) // 设置消费模式
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("收到消息:" + new String(messageExt.getBody(), Charset.defaultCharset()));}
}
控制台打印:
4.3.2 广播模式
设置广播模式:messageModel = MessageModel.BROADCASTING
@Component
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot",messageModel = MessageModel.BROADCASTING) // 设置消费模式
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {...}
}
控制台打印:
4.4 顺序消息
生产者设置队列选择器,需要将顺序消息放在同一个队列:
@Test
public void sendOrderlyMsg() {// 设置队列的选择器// 将需要顺序消费的消息存储到同一个队列中rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {String orderIdStr = (String)o; // 传入参数Long orderId = Long.valueOf(orderIdStr);int index = (int)(orderId % list.size());return list.get(index);}});List<OrderStep> orderSteps = OrderUtil.buildOrders();// 发送消息for (OrderStep step : orderSteps) {Message<String> msg = MessageBuilder.withPayload(step.toString()).build();rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot", msg, String.valueOf(step.getOrderId()));}
}
消费者默认一个队列是线程并发消费,可以通过设置 consumeMode = ConsumeMode.ORDERLY
,将一个消息队列对应消费者的一个线程,以实现顺序消费:
@Component
@RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot", topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY) // 设置一个队列对应一个线程
public class OrderlyTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("当前线程:" + Thread.currentThread().getId() + ",队列ID:" + messageExt.getQueueId() + ",消息内容:"+ new String(messageExt.getBody(), Charset.defaultCharset()));}
}
控制台打印:
4.5 延时消息
@Test
public void sendDelayMsg() {SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Message<String> msg = MessageBuilder.withPayload("发送延时消息,发送时间:" + cusFormat.format(new Date())).build();// 设置延时等级3,这个消息将在 10s 之后发送(详看delayTimeLevel),消息在任务队列里存储,10s 后发送// 3000 代表同步等待 3s,若超过 3s 消息队列都没有响应,自动断开链接rocketMQTemplate.syncSend("helloTopicBoot", msg, 3000, 3);
}
4.6 消息过滤
4.6.1 Tag 过滤
生产者生产消息,Topic 和 Tag 以 “:” 分割( “:” 前后不能有空格)
@Test
public void sendTagFilterMsg() {Message<String> msg1 = MessageBuilder.withPayload("消息A").build();rocketMQTemplate.send("tagFilterBoot:TagA", msg1);Message<String> msg2 = MessageBuilder.withPayload("消息B").build();rocketMQTemplate.send("tagFilterBoot:TagB", msg2);Message<String> msg3 = MessageBuilder.withPayload("消息C").build();rocketMQTemplate.send("tagFilterBoot:TagC", msg3);
}
消费者设置过滤条件:
@Component
@RocketMQMessageListener(consumerGroup = "tagFilterConsumerBoot", topic = "tagFilterBoot", selectorExpression = "TagA || TagC") // selectorExpression 过滤条件
public class TagFilterTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("消息内容:" + new String(messageExt.getBody(), Charset.defaultCharset()));}
}
控制台打印:
4.6.2 SQL92 过滤
使用 setHeader(String headerName, Object headerValue)
方法设置过滤条件:
@Test
public void sendSQL92FilterMsg() {Message<String> msg1 = MessageBuilder.withPayload("美女A").setHeader("age", 22).setHeader("weight", 90).build();rocketMQTemplate.send("SQL92FilterBoot", msg1);Message<String> msg2 = MessageBuilder.withPayload("美女B").setHeader("age", 20).setHeader("weight", 100).build();rocketMQTemplate.send("SQL92FilterBoot", msg2);Message<String> msg3 = MessageBuilder.withPayload("美女C").setHeader("age", 25).setHeader("weight", 120).build();rocketMQTemplate.send("SQL92FilterBoot", msg3);
}
消费者设置 selectorType
与 selectorExpression
:
@Component
@RocketMQMessageListener(consumerGroup = "SQL92FilterConsumerBoot", topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92, selectorExpression = "age<25 and weight>90") // 设置
public class Sql92FilterTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("消息内容:" + new String(messageExt.getBody(), Charset.defaultCharset()));}
}
控制台打印:
注:在使用 SQL 过滤的时候, 需要修改配置文件 broker.conf,具体参考 3.6.2
文章参考:Java微服务商城高并发秒杀项目实战|Spring Cloud Alibaba真实项目实战+商城双11秒杀+高并发+消息+支付+分布式事物Seata
相关文章:

从环境部署到开发实战:消息队列 RocketMQ
文章目录 一、消息队列简介1.1 什么是消息队列1.2 常见消息队列对比1.3 RockectMQ 核心概念1.4 RockectMQ 工作机制 (★) 二、RocketMQ 部署相关2.1 服务器单机部署2.2 管控台页面 三、RocketMQ 的基本使用3.1 入门案例3.2 消息发送方式3.2.1 同步消息3.…...

【机器学习(九)】分类和回归任务-多层感知机(Multilayer Perceptron,MLP)算法-Sentosa_DSML社区版
文章目录 一、算法概念二、算法原理(一)感知机(二)多层感知机1、隐藏层2、激活函数sigma函数tanh函数ReLU函数 3、反向传播算法 三、算法优缺点(一)优点(二)缺点 四、MLP分类任务实现…...

渗透测试-文件上传绕过思路
文件上传绕过思路 引言 分享一些文件上传绕过的思路,下文内容多包含实战图片,所以打码会非常严重,可多看文字表达;本文仅用于交流学习, 由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失&#x…...

等保测评中的密码学应用分析
等保测评中密码学应用的分析 等保测评(信息安全等级保护测评)是中国信息安全领域的一项重要活动,旨在评估信息系统的安全性,并根据评估结果给予相应的安全等级。在等保测评中,密码学应用分析是评估信息系统安全性的关键…...

LCR 007. 三数之和
文章目录 1.题目2.思路3.代码 1.题目 LCR 007. 三数之和 给定一个包含 n 个整数的数组 nums,判断 nums 中是否存在三个元素 a ,b ,c *,*使得 a b c 0 ?请找出所有和为 0 且 不重复 的三元组。 示例 1:…...

【入门01】arcgis api 4.x 创建地图、添加图层、添加指北针、比例尺、图例、卷帘、图层控制、家控件(附完整源码)
1.效果 2.代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title></title><link rel"s…...

STL迭代器标签
STL(标准模板库)迭代器标签是用来标识迭代器类型的分类机制。这些标签有助于确定迭代器的特性,比如它是否可以进行随机访问、是否支持修改元素等。主要的迭代器标签包括: Input Iterator:只读迭代器,可以顺…...

容器学习之SparseArray源码解析
1、SparseArray是android sdk 提供集合类,主要用来替换key 为int类型,value为Object类型的Hashmap 2、SparseArray和HashMap相比优缺点: 优点: 1、SparseArray存在一个int[]keys, 因此避免自动装箱 2、SparseArray扩容时只需要数…...

信创改造技术介绍
目录 服务发现和注册 Sentinel 核心功能 典型应用场景 gateway 网关的主要功能 Spring Cloud Gateway Kong Kong 的主要功能 Kong 的架构: Kong 的使用场景: Kong 的部署模式: 优势 Gateway与Sentinel区别 Gateway Sentinel …...

【可见的点——欧拉函数】
在数论,对正整数n,欧拉函数是小于或等于n的正整数中与n互质的数的数目(不包括1) 题目 思路 有三个点比较特殊(因为一来这三个点一定可见,同时也无法用gcd 1判断):(0&am…...

Maven重点学习笔记(包入门 2万字)
Maven依赖管理项目构建工具 尚硅谷 5h 2023最新版 一,Maven简介 1.为什么学习Maven 1.1, Maven是一个依赖管理工具 1️⃣ jar包的规模 随着我们使用越来越多的框架,或者框架封装程度越来越高,项目中使用的jar包也越来越多。项目中&…...

1.分页查询(后端)—— Vue3 + SpringCloud 5 + MyBatisPlus + MySQL 项目系列(基于 Zulu 11)
本手册是基于 Vue3 SpringCloud5 MyBatisPlus MySQL 的项目结构和代码实现,旨在作为一个教学案例进行讲解。为了使案例更具普适性,文档中的公司名称、实体类、表名以及字段名称等敏感信息均已脱敏。 项目结构概述 项目采用标准的分层架构࿰…...

机器学习与深度学习的区别:深入理解与应用场景
在人工智能(AI)的广阔领域中,机器学习和深度学习是两个核心概念,它们虽然紧密相关,但在定义、技术、数据处理能力、应用场景等方面存在显著差异。本文将深入探讨这些区别,帮助读者更好地理解并选择合适的技…...

C++学习笔记(45)
322、循环队列、信号量、生产/消费者模型的源代码 一、demo1.cpp // demo1.cpp,本程序演示循环队列的使用。 #include "_public.h" int main() { using ElemTypeint; squeue<ElemType,5> QQ; ElemType ee; // 创建一个数据元素。 cout << &qu…...

【2】图像视频的加载和显示
文章目录 【2】图像视频的加载和显示一、代码在哪写二、创建和显示窗口(一)导入OpenCV的包cv2(二)创建窗口(三)更改窗口大小 & 显示窗口(四)等待用户输入补充:ord()函…...

1. BOOT.BIN 2. 固化 3. 启动 4. SDK 5. 文件
在进行FPGA的开发与固化过程中,生成BOOT.BIN文件是一个重要的步骤。BOOT.BIN文件通常包含了系统启动所需的不同文件,以下是如何创建和使用该文件的详细说明。 ### 生成BOOT.BIN文件的步骤 1. **方法一:通过项目构建** - 右键单击项目…...

vue按钮接收键盘回车事件
了解了!如果您想让 Submit 按钮在按下回车键时被触发,可以在 Vue 组件中监听全局的键盘事件。以下是实现这一功能的示例: 示例代码 <template><div><inputtype"text"v-model"inputValue"placeholder&qu…...

腾讯云点播及声音上传
文章目录 1、开通腾讯云点播2、获取腾讯云API密钥3、完成声音上传3.1、引入依赖3.2、参考:接入点地域3.3、参考:任务流设置3.4、首先修改配置:3.4.1、 3.5、TrackInfoApiController --》 uploadTrack()3.6、VodServiceImpl --》 uploadTrack(…...

如何查看服务器是否有raid阵列卡以及raid类型
要查看服务器是否配置了RAID阵列卡以及RAID的类型,可以使用多种方法。以下是一些常用的命令和步骤: 1. 使用 lspci 命令 这个命令可以列出所有的PCI设备,包括RAID控制器。 lspci | grep -i raid 如果输出中有RAID相关的设备信息,那…...

工博会动态 | 来8.1馆 看桥田如何玩转全场
北京时间2024年9月24日,中国国际工业博览会开幕,桥田智能(8.1馆A001)推出心意三重奏,有没有小伙伴们发现呢?现在,让我们一起city walk下! 桥田显眼包横空出道 有小伙伴已经发现&…...

新版torch_geometric不存在uniform、maybe_num_nodes函数问题(Prune4ED论文报错解决)
这是在复现论文“Towards accurate subgraph similarity computation via neural graph pruning”时遇到的报错。 ImportError: cannot import name uniform from torch_geometric.nn.pool.topk_pool 一、报错原因 论文作者使用的是2.1.0版本的torch_geometric。而我安装了2.…...

实现简易 vuedraggable 的拖拽排序功能
一、案例效果 拖拽计数4实现手动排序 二、案例代码 <draggable:list"searchResult.indicator":group"{ name: indicators }"item-key"field"handle".drag-handle-icon"><divclass"field-item"v-for"(item…...

第L2周:机器学习|线性回归模型 LinearRegression:2. 多元线性回归模型
本文为365天深度学习训练营 中的学习记录博客原作者:K同学啊 任务: ●1. 学习本文的多元线形回归模型。 ●2. 参考文本预测花瓣宽度的方法,选用其他三个变量来预测花瓣长度。 一、多元线性回归 简单线性回归:影响 Y 的因素唯一&…...

JavaScript的条件语句
if条件语句 if结构先判断一个表达式的布尔值,然后根据布尔值的真伪,执行不同的语句。所谓布尔值,指的是JavaScript 的两个特殊值,true表示真,false表示伪。 if语句语法规范 if(布尔值){语句;}var m3if(m3){console.l…...

vue3 vite模式配置测试,开发、生产环境以及代理配置
1、首先在根目录下创建三个文本文件:.env.development,.env.production,.env.test .env.development中的内容为: // 开发环境 .env.development NODE_ENV development VITE_APP_MODE development VITE_OUTPUTDIR dist_dev /…...

【rabbitmq-server】安装使用介绍
在 1050a 系统下安装 rabbitmq-server 服务以及基本配置;【注】:改方案用于A版统信服务器操作系统 文章目录 功能概述功能介绍一、安装软件包二、启动服务三、验证四、基本配置功能概述 RabbitMQ 是AMQP的实现,高性能的企业消息的新标准。RabbitMQ服务器是一个强大和可扩展…...

Kafka系列之:安装部署CMAK,CMAK管理大型Kafka集群参数调优
Kafka系列之:安装部署CMAK,CMAK管理大型Kafka集群参数调优 一、CMAK二、要求三、配置四、启动服务五、使用 Security 启动服务六、消费者/生产者滞后七、从 Kafka Manager 迁移到 CMAK八、CMAK管理大型Kafka集群参数调优九、后台运行CMAK十、输出日志一、CMAK CMAK(之前称为…...

c语言200例 64
大家好,欢迎来到无限大的频道。 今天带领大家来学习c语言。 题目要求: 设计一个进行候选人的选票程序。假设有三位候选人,在屏幕上输入要选择的候选人姓名, 有10次投票机会,最后输出每个人的得票结果。好的ÿ…...

[leetcode]216_组合总和III_给定数字范围且输出无重复
找出所有相加之和为 n 的 k 个数的组合,且满足下列条件: 只使用数字1到9 每个数字 最多使用一次 返回 所有可能的有效组合的列表 。该列表不能包含相同的组合两次,组合可以以任何顺序返回。示例 1: 输入: k 3, n 7 输出: [[1,2,4]] 解释: 1…...

Doris 2.x 安装及使用
Doris 2.x 安装及使用 简介 Apache Doris 是一款基于 MPP 架构的高性能、实时的分析型数据库,以高效、简单、统一的特点被人们所熟知,仅需亚秒级响应时间即可返回海量数据下的查询结果,不仅可以支持高并发的点查询场景,也能支持…...