Rocketmq技术详解
Rocketmq技术详解
运维部署 docker-compose.yml
version: '3.5'
services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./logs:/opt/logs- ./store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxiswho/rocketmq:brokercontainer_name: rmqbrokerports:- 10909:10909- 10911:10911volumes:- ./logs:/opt/logs- ./store:/opt/store- ./conf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"command: mqbroker -c /etc/rocketmq/broker.confdepends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqbrokerrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rmqconsoleports:- 8080:8080environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqconsolenetworks:rmq:name: rmqdriver: bridge
然后在与docker-compose.yml
同级下面相应的建立三个文件夹conf
、logs
、store
。然后在conf
文件夹下面建立broker.conf
配置文件,所有文件的目录位置如下所示。
docker-compose.yml
conf- broker.conf
logs
store
然后在编写broker.conf
配置文件里面的内容
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# 所属集群名字
brokerClusterName=DefaultCluster# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a# 0 表示 Master,> 0 表示 Slave
brokerId=0# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.1.16# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口
listenPort=10911# 删除文件时间点,默认凌晨4点
deleteWhen=04# 文件保留时间,默认48小时
fileReservedTime=120# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
配置文件中的内容我们只需要改动一点即可,即brokerIP1
这个属性,我们将其更改为我们本机的ip,可以利用ipconfig
进行查看。
修改完以后我们直接在docker-compose.yml
文件所在的位置输入命令docker-compose up
即可启动。启动成功以后在浏览器中输入http://localhost:8080/
即可看到管理页面,就表示我们搭建成功了。
架构设计
1 技术架构(参考来源于官网)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T2Lk6A0V-1678195927847)(null)]
RocketMQ架构上主要分为四部分,如上图所示:
- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Consumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer和Consumer仍然可以动态感知Broker的路由的信息。
- BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
- Remoting Module:整个Broker的实体,负责处理来自Client端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
- Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
- Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
2 部署架构
RocketMQ 网络部署特点
- NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
结合部署架构图,描述集群工作流程:
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
3.内容总结:
结合部署架构图,描述集群工作流程:
1、启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
2、Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
3、Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
4、Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
总结:
Rocketmq发送消息的时候,先启动NameServer,NameServer成功启动会先去和Broken连接,这时候NameServer和Broken就有心跳。当生产者Producer发送消息的时候,先跟NameServer连接,判断发送的Topic在哪些Broken上,然后按轮询选择Broken中的一个队列。然后Producer会和Broken直接建立连接,以后所有发送的消息(同个Topic)都是直接和Broken连接,消费者也是这样流程。
补充:
集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。
ps:表示改消费组有一条未消费
数量2 指的是 consumer_topic-queue-three 有两个消费组都是叫这个名字
4.运行演示
4.1.1同步发送
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
说几个概念
4.1.1.1 生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
4.1.1.2 消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
4.1.1.3 集群消费(Clustering)(默认是集群消费)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
4.1.1.4 广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
例如代码:生成者发送消息
//同步发送
public void sync() {Message<String> message = new Message<>();message.setId(UUID.randomUUID().toString());message.setContent("Hello, springboot-ac-rocketmq !");rocketMQTemplate.convertAndSend("topic-queue-one", message);rocketMQTemplate.convertAndSend("topic-queue-two", "Hello, springboot-ac-rocketmq !");
}
消费者消费消息
@Slf4j
@Component
public class RocketmqConsumer {@Component@RocketMQMessageListener(topic = "topic-queue-one", consumerGroup = "consumer_topic-queue-one")public class ConsumerOne implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {log.info("consumer-one received message: {}", message);}}@Component@RocketMQMessageListener(topic = "topic-queue-two", consumerGroup = "consumer_topic-queue-two")public class ConsumerTwo implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("哈哈哈哈我进来消费 topic-queue-two 消息啦");log.info("consumer-two received message: {}", message);}}
}
运行后打断点发送,队列是先进后去,所以topic-queue-two先消费
消费完再消费这个topic-queue-one
4.1.2异步发送(比较重要)
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
看代码 生成者发送消息
public void async() {Message<String> message = new Message<>();message.setId(UUID.randomUUID().toString());message.setContent("Hello,I am asyncSend !");rocketMQTemplate.asyncSend("async-one", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("send successful");}@Overridepublic void onException(Throwable throwable) {log.info("send fail; {}", throwable.getMessage());}});
}
消费者代码
@Component
@RocketMQMessageListener(topic = "async-one", consumerGroup = "consumer_topic-queue-three")
public class ConsumerThreee implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {System.out.println("哈哈哈哈我进来消费 async-one 消息啦");log.info("consumer-two received message: {}", message);}
}
运行后打断点发现可以正常消费
从运行的结果看,15:37分是发送成功的,可是消费是15:39分
4.1.3 单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
rocketMQTemplate.sendOneWay("topic-oneWay", "send one-way message");
一调接口里面就返回成功,可是消费等了一会菜消费
5.消费消息的顺序性(比较重要)
5.1简介:
- 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
- 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
技术原理:
就是用hashKey作为每个队列的唯一标志,在电商中,一般是引订单id作为hashKey
5.2 demo演示
模拟2个队列,id1和id2进行操作
id1的消息有10,30
id2的消息有 20,40
演示代码如下:
private final String id1 = "10086";private final String id2 = "10087";/**** hashKey为订单id*/
public void testSendSyncOrderly1() {Message<String> stringMessage = new Message<>();stringMessage.setId(id1);String message = "10";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id1);
}/**** hashKey为订单id*/
public void testSendSyncOrderly2() {Message<String> stringMessage = new Message<>();stringMessage.setId(id2);String message = "20";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id2);
}/**** hashKey为订单id*/
public void testSendSyncOrderly3() {Message<String> stringMessage = new Message<>();stringMessage.setId(id1);String message = "30";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id1);
}/**** hashKey为订单id*/
public void testSendSyncOrderly4() {Message<String> stringMessage = new Message<>();stringMessage.setId(id2);String message = "40";stringMessage.setContent(message);// 模拟有序消费rocketMQTemplate.syncSendOrderly("topic-orderly", stringMessage, id2);
}
消费端代码
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic-orderly",consumerGroup = "orderly-consumer-group", consumeMode = ConsumeMode.ORDERLY
)public class OrderConsumer implements RocketMQListener<Message> {int sumId1 = 0;int sumId2 = 0;@Overridepublic void onMessage(Message message) {if(message.getId().equals("10086")){sumId1 = sumId1+Integer.parseInt((String)message.getContent());}else{sumId2 = sumId2+Integer.parseInt((String)message.getContent());}System.out.println("开始消费");log.info("========{}=======", sumId1);log.info("========{}=======", sumId2);System.out.println("消费结束");}}
发现最后消费是
证明是分区有序性。
6.延时消息样例
应用场景:
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
CONSUME_FROM_LAST_OFFSET, //第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
CONSUME_FROM_MIN_OFFSET,
CONSUME_FROM_MAX_OFFSET,
CONSUME_FROM_FIRST_OFFSET, //第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP; //第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 (一般选这个)
消费端要实现这个类RocketMQPushConsumerLifecycleListener,代码如下:
/**** 延时消费*/
@Component
@Slf4j
public class OffsetConsumerByHjt {@Component@RocketMQMessageListener(topic = "topic-offset-by-hjt", consumerGroup = "topic-offset-by-hjt-consumer")public class OfferConsumerBy implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(Message message) {System.out.println("哈哈哈哈我进来消费");String result = result(message.getBody());System.out.println("输出 result "+result);log.info("topic-offset-by-hjt: {}", new String(message.getBody()));}@Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {//第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费defaultMQPushConsumer.setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);}}public static String result(byte[] decrypt) {try {String result = new String(decrypt, "UTF-8");return result;} catch (UnsupportedEncodingException var2) {var2.printStackTrace();return null;}}
}
生成者代码,还是按顺序消费测试
/**** hjt写的延时消费demo*/public void sendByHjt() throws Exception {Message message = new Message();//生产者DefaultMQProducer producer = new DefaultMQProducer("topic-offset-by-hjt-product");producer.setNamesrvAddr("192.168.1.219:9876");producer.start();for(int i = 0;i<5;i++){message.setTopic("topic-offset-by-hjt");message.setBody(("我是延迟消费啊啊" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";//4对于的就是延迟30smessage.setDelayTimeLevel(4);producer.send(message, new SendCallback() {//成功后执行的方法@Overridepublic void onSuccess(SendResult sendResult) {log.info("延迟消费成功");}//失败后执行的方法@Overridepublic void onException(Throwable throwable) {log.error("还未到指定的消费时间");}});}//关闭生产者producer.shutdown();}
30s后发现已进来消费
为什么能同时消费这么多数据,因为rocketmq在那一瞬间同时去队列中拿数据,那一瞬间一起消费掉。
看了下后台,发现读和写都是4个队列。其中 perm为6 指的是可读可写的队列为6
6.根据Tag演示
1.生成者代码
/*** @author hjt* @date 2019/8/21*/
@Component
@Slf4j
public class TagProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;public void sendTagsMessage() {String[] tags = new String[]{"A", "B", "C", "D"};String message = "tags message : ";for (int i = 0; i < tags.length; i++) {rocketMQTemplate.syncSend("topic-tags:" + tags[i], message + tags[i]);}}
}
2.消费者代码
/*** @author hjt* @date 2019/8/21*/
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic-tags",consumerGroup = "tags-consumer-group",selectorExpression = "A||C")
public class TagConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("messgaetag:"+message);log.info("======={}=======", message);}
}
运行结果: 因为 selectorExpression = “A||C” 选择A和C
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JN0sDuGu-1678195925254)(Rocketmq%E6%8A%80%E6%9C%AF%E8%AF%A6%E8%A7%A3.assets/image-20220801163823708.png)]
ps:注意
rocketMQTemplate.syncSend("topic-tags:" + tags[i], message + tags[i]); //topic-tags: 一定要有冒号
7.分布式事务
(1)相关概念
RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:
1、Half(Prepare) Message——半消息(预处理消息)
半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。
2、Message Status Check——消息状态回查
由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。
1、A服务先发送个Half Message给Brock端,消息中携带 B服务 即将要+100元的信息。
2、当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。
3、执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)
4.1)、如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。
4.2)、如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。
4.3)、如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。
什么情况会回查
也会有两种情况
1)执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(commit或者rollback)导致最终返回UNKNOW,那么就会回查。2) 本地事务执行成功后,返回Commit进行消息二次确认的时候的服务挂了,在重启服务那么这个时候在brock端它还是个Half Message(半消息),这也会回查。
特别注意: 如果回查,那么一定要先查看当前事务的执行情况,再看是否需要重新执行本地事务。
想象下如果出现第二种情况而引起的回查,如果不先查看当前事务的执行情况,而是直接执行事务,那么就相当于成功执行了两个本地事务。
为什么说MQ是最终一致性事务
通过上面这幅图,我们可以看出,在上面举例事务不一致的两种情况中,永远不会发生
A账户减100 (失败),B账户加100 (成功)
因为:如果A服务本地事务都失败了,那B服务永远不会执行任何操作,因为消息压根就不会传到B服务。
那么 A账户减100 (成功),B账户加100 (失败) 会不会可能存在的。
答案是会的
因为A服务只负责当我消息执行成功了,保证消息能够送达到B,至于B服务接到消息后最终执行结果A并不管。
那B服务失败怎么办?
如果B最终执行失败,几乎可以断定就是代码有问题所以才引起的异常,因为消费端RocketMQ有重试机制,如果不是代码问题一般重试几次就能成功。
如果是代码的原因引起多次重试失败后,也没有关系,将该异常记录下来,由人工处理
,人工兜底处理后,就可以让事务达到最终的一致性。
补充说明
(2)消息事务样例
事务消息共有三种状态,提交状态、回滚状态、中间状态:
-
TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
-
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
-
TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
事务消息使用上的限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionalMessageCheckListener
类来修改这个行为。 - 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于
transactionTimeout
参数。 - 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
(3)演示demo
看 生产者代码
package com.hjt.transaction;import com.hjt.message.Message;
import com.hjt.message.MessageTransaction;
import com.hjt.transaction.mapper.TransactionMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.UUID;/*** @author hjt* @date 2019/8/20*/
@Component
@Slf4j
public class TransactionProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;public void produce() {MessageTransaction<String> message = new MessageTransaction<>();//在正在的业务中 Aid和Bid应该是前端已经知道是啥,传给后端,比如A的userId和B的UserIdmessage.setAId(UUID.randomUUID().toString());message.setBId(UUID.randomUUID().toString());message.setContent("B即将要+100元,A要减100元");log.info("========sending message=========:{}",message);
// rocketMQTemplate.sendMessageInTransaction("tx-group", "topic-tx", MessageBuilder.withPayload(message).build(), null); 2.0.3有这个版本 tx-grouprocketMQTemplate.sendMessageInTransaction( "topic-tx", MessageBuilder.withPayload(message).build(), null);log.info("========finish send =========");}}
监听者代码
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.StringUtils;import java.util.concurrent.ConcurrentHashMap;/*** @author hjt* @date 2019/8/20*/
@Slf4j
//@RocketMQTransactionListener(txProducerGroup = "tx-group") 2.0.3的版本有这个
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {/**** 存放事务的状态 支持并发的场景*/private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {log.info("==============进到这里说明 Half Message 发送成功");//获取队列中的事务idString rocketmqTransactionId = getRocketmqTransactionId(msg);try{//模拟 执行A服务-100元操作int redMoneyByA = -100;//定义// 0 是中间状态 1 是提交事务状态 2是回滚事务localTrans.put(rocketmqTransactionId,1);//模拟 执行A服务-100元操作失败
// int redMoneyExceptionByA = 100/0;return RocketMQLocalTransactionState.UNKNOWN;}catch (Exception e){// 执行A服务-100元操作出现异常就 事务回查 调用下面的checkLocalTransaction方法localTrans.put(rocketmqTransactionId,2);log.error("插入数据库失败,原因为:{}",e.getMessage());return RocketMQLocalTransactionState.UNKNOWN;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {log.info("============== 模拟回查本地事务 checkLocalTransaction");Object payload = msg.getPayload();MessageHeaders headers = msg.getHeaders();System.out.println("输出:" + payload);System.out.println("输出:" + headers);String rocketmqTransactionId = getRocketmqTransactionId(msg);//查rocketmqTransactionId的事务状态Integer status = localTrans.get(rocketmqTransactionId);if(null!=status){switch (status){case 0:log.info("============== 模拟回查本地事务结束 提交状态为:UNKNOWN");return RocketMQLocalTransactionState.UNKNOWN;case 1:log.info("============== 模拟回查本地事务结束 提交状态为:COMMIT");return RocketMQLocalTransactionState.COMMIT;case 2:log.info("============== 模拟回查本地事务结束 提交状态为:ROLLBACK");return RocketMQLocalTransactionState.ROLLBACK;}}log.info("============== 模拟回查本地事务结束,提交状态为 ROLLBACK");return RocketMQLocalTransactionState.ROLLBACK;}/**** 获取事务id* @param msg* @return*/public String getRocketmqTransactionId(Message msg){JSONObject json = JSONUtil.parseObj(msg.getHeaders(), false, true);String rocketmqTransactionId = (String)json.get("rocketmq_TRANSACTION_ID");String topic = (String)json.get("rocketmq_TOPIC");log.info("=======事务id========{}",rocketmqTransactionId);log.info("=======topic========{}",topic);if(!StringUtils.isEmpty(rocketmqTransactionId)){return rocketmqTransactionId;}return "";}
消费者代码
package com.hjt.transaction;import com.hjt.message.MessageTransaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** @author hjt* @date 2019/8/20*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "topic-tx", consumerGroup = "tx-consumer-group")
public class TransactionConsumer implements RocketMQListener<MessageTransaction> {@Overridepublic void onMessage(MessageTransaction message) {log.info("topic-tx received message: {}", message);log.info("消费端开始消费信息 执行B服务加100操作");//执行B服务加100的操作try{//B服务加100int addMoneyByB = 100;}//如果B服务加100失败,可是A已经减100成功了,这时候要把异常记录下来,人工进行处理catch (Exception e){log.error("B服务加100异常,需要人工处理,异常信息为:{}",e.getMessage());//用一张异常表单独记录 该消息的id 可以作为异常表的主键String id = message.getBId();}}}
rocketmq会稍微等一点时间再去执行checkLocalTransaction方法
正常运行结果:
模拟下执行A操作异常的时候
运行结果:
相关文章:
Rocketmq技术详解
Rocketmq技术详解 运维部署 docker-compose.yml version: 3.5 services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./logs:/opt/logs- ./store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxisw…...
TeeChart VCL/FMX v2023 crack
TeeChart VCL/FMX v2023 crack TeeChart Pro VCL允许您为所有领域(包括商业、工程、金融、统计、科学、医疗、实时和网络)创建通用和专用图表和绘图应用程序。TeeChart Pro VCL具有多种图表类型的图表库,包括2D或3D线条、条形图、水平条、区域、点、饼图、箭头、气泡…...
[Java·算法·困难]LeetCode32. 最长有效括号
每天一题,防止痴呆题目示例分析思路1题解1分析思路2题解2分析思路3题解3👉️ 力扣原文 题目 给你一个只包含 ( 和 ) 的字符串,找出最长有效(格式正确且连续)括号子串的长度。 示例 输入:s "(()&q…...
pytorch如何搭建一个最简单的模型,
一、搭建模型的步骤 在 PyTorch 中,可以使用 torch.nn 模块来搭建深度学习模型。具体步骤如下: 定义一个继承自 torch.nn.Module 的类,这个类将作为我们自己定义的模型。 在类的构造函数 __init__() 中定义网络的各个层和参数。可以使用 to…...
JS实现css的hover效果,兼容移动端
Hi I’m Shendi JS实现css的hover效果,兼容移动端 功能概述 CSS的hover即触碰时触发,在电脑端鼠标触碰,移动端手指触摸 有的时候光靠css实现不了一些效果,例如元素触发hover,其他元素触发动画效果,所以需要…...
企业微信的后台怎么进入和管理?
企业微信管理后台,只有企业的管理员才可以进企业微信后台,普通员工想要进入后台、可以联系管理员将你设置为后台管理员。 一、怎么进入企业微信后台 管理员进入企业微信后台有两种路径; 路径一: 企业管理员直接在浏览器搜索企…...
【2223sW2】LOG2
写在前面 好好学习,走出宿舍,走向毕设! 一些心路历程记录,很少有代码出现 因为鬼知道哪条代码到时候变成毕设的一部分了咧,还是不要给自己的查重挖坑罢了 23.3.2 检验FFT 早上师兄帮忙看了一眼我画的丑图ÿ…...
buuctf-web-[SUCTF 2018]MultiSQL1
打开界面,全部点击一遍,只有注册和登录功能可以使用注册一个账号,注册admin提示用户存在,可能有二次注入,注册admin自动加了一个字符,无法二次注入,点击其他功能点换浏览器重新登录后࿰…...
GitLab创建仓库分配权限
文章目录创建仓库分配权限参考资料创建仓库 点击“New project”创建新项目 分配权限 点击左侧菜单栏“Members”成员,菜单 “Invite member”邀请成员,添加人员;“Invite group”邀请组织,添加一个组织所有成员下面输入框搜索…...
代码随想录-51-110.平衡二叉树
目录前言题目1.求高度和深度的区别节点的高度节点的深度2. 本题思路分析:3. 算法实现4. pop函数的算法复杂度5. 算法坑点前言 在本科毕设结束后,我开始刷卡哥的“代码随想录”,每天一节。自己的总结笔记均会放在“算法刷题-代码随想录”该专…...
项目实战典型案例27——对生产环境以及生产数据的敬畏之心
对生产环境以及生产数据的敬畏之心一:背景介绍总结升华一:背景介绍 本篇博客是对项目开发中出现的对生产环境以及生产数据的敬畏之心行的总结并进行的改进。目的是将经历转变为自己的经验。通过博客的方式分享给大家,大家一起共同进步和提高…...
如何查找你的IP地址?通过IP地址能直接定位到你家!
我们ip地址分为A、B、C、D、E共5类,每一类地址范围不同,从A到Eip地址范围依次递减,其中哦,D和E是保留地址,我们用不了。A、B、C3类地址很多都被美国这样的西方国家分走了,而留给我们的就剩有限的地址了&…...
Containers--array类
Array 类 简介 Array 类是一个固定大小的数组,它的大小在编译时就已经确定了。Array 类的大小是固定的,因此它的大小不能改变。 数组是固定大小的序列容器:它们以严格的线性顺序保存特定数量的元素。 在内部,数组除了包含的元素之外不保留…...
LinqConnect兼容性并支持Visual Studio 2022版本
LinqConnect兼容性并支持Visual Studio 2022版本 现在支持Microsoft Visual Studio 2022版本17.5预览版。 添加了Microsoft.NET 7兼容性。 共享代码-共享相同的代码,以便在不同的平台上处理数据。LinqConnect是一种数据库连接解决方案,适用于不同的基于.…...
流量监管与整形
流量监管与整形概览流量监管介绍流量监管令牌桶流量监管的具体实现单桶单速流量监管双桶单速流量监管双桶双速流量监管流量整形介绍GTS(Generic Traffic Shaping)LR(Line Rate)流量整形与流量监管的区别概览 流量整形是对报文的速…...
详解init 容器
什么是init容器 init 容器是一种特殊容器,在 Pod 内的应用容器启动之前运行。Init 容器可以包括一些应用镜像中不存在的实用工具和安装脚本。 你可以在 Pod 的规约中与用来描述应用容器的 containers 数组平行的位置指定 Init 容器 每个 Pod 中可以包含多个容器&…...
RequestResponseBodyMethodProcessor
既是一个参数解析器,也是一个返回结果处理器。 1.持有消息转换器的集合 protected final List<HttpMessageConverter<?>> messageConverters;2.作为参数解析器,例如对RequestBody标识的参数进行解析 判断是否支持当前类型的参数 Overrid…...
函数的极限
目录 函数的极限 函数极限的定义: 例题: 左右极限: 自变量趋于无穷大时函数的极限: 例题: 函数极限的性质: 函数极限与数列极限之间的关系: 函数的极限 函数极限的定义: 一句…...
dnf命令使用
1. 简介 DNF是新一代的rpm软件包管理器。他首先出现在 Fedora 18 这个发行版中。而最近,它取代了yum,正式成为 Fedora 22 的包管理器 DNF包管理器克服了YUM包管理器的一些瓶颈,提升了包括用户体验,内存占用,依赖分析…...
CLIP CLAP
文章目录CLIPabstractintroCLAP: LEARNING AUDIO CONCEPTS FROM NATURAL LANGUAGE SUPERVISIONabstractmethodCLIP open AI2021.2代码&预训练模型 abstract 原有的基于有监督数据训练的计算机分类任务,在面对新的分类目标时泛化性和可用性都会变差࿱…...
Debezium报错处理系列之五十二:解决Sql Server数据库安装后修改主机名导致sqlserver数据库实例名称没有修改从而无法设置CDC的问题
Debezium报错处理系列之五十二:解决Sql Server数据库安装后修改主机名导致sqlserver数据库实例名称没有修改从而无法设置CDC的问题 一、完整报错二、错误原因三、解决方法Debezium报错处理系列一:The db history topic is missing. Debezium报错处理系列二:Make sure that t…...
scratch老鹰捉小鸡 电子学会图形化编程scratch等级考试二级真题和答案解析2022年12月
目录 scratch老鹰捉小鸡 一、题目要求 1、准备工作 2、功能实现 二、案例分析 <...
概率论小课堂:公理化过程(大数据方法解决问题的理论基础)
文章目录 引言I 初等概率论1.1 19世纪概率论的最大难题1.2 伯努利版本的大数定理1.3 切比雪夫版本的大数定理II 现代概率论(用公理来描述概率论)2.1 柯尔莫哥洛夫2.1 用公理来描述概率论III 最基本的概率论定理3.1 互补事件的概率之和等于13.2 不可能事件的概率为零引言 前苏…...
WOW64 IsWow64Process GetNativeSystemInfoWindows System32 SysWOW64
最近开发有遇到这方面的一些知识点,在此记录下。首先,什么是WOW64?在知道这个之前我觉得需要了解一下,C:\\Windows\\System32和C:\\Winodws\\SysWOW64这两个文件夹的区别,Windows系统最开始的时候出的就是32bit的系统&…...
Spring Boot 3.0系列【10】核心特性篇之应用配置的高阶用法
有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot版本3.0.3 源码地址:https://gitee.com/pearl-organization/study-spring-boot3 文章目录 前言1. 命令行2. JSON3. 外部化配置3.1 配置文件加载位置3.2 导入配置3.2 属性占位符4. 加密配置5. 加载YML配置文件6. 配…...
Java int类型数值比较总结
如果是int类型,判断相等的话直接使用 ""来判断,例如: int i 10; int j 10; System.out.print(i j); 如果是Integer类型,则可以使用equals方法进行相等比较。 int与Integer的基本使用对比 (…...
Pyspark基础入门5_RDD的持久化方法
Pyspark 注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hi…...
汽车娱乐系统解决方案
Danlaw在汽车和航空航天行业里是全球知名的技术和服务供应商,致力于提供更加安全与智能的系统。Danlaw以突破性技术和高效开发、动态环境的自适应解决方案而闻名。Danlaw优秀的联网汽车解决方案使之成为全球大型互联设备供应商之一。 一 信息娱乐系统测试 | 风丘科…...
Go语言结构体,这一篇就够了
Go语言结构体,这一篇就够了1.结构体的概念2.结构体的定义3.结构体的实例化4.结构体初始化5.构造函数6.方法和接收者方法接收者7.嵌套结构体8.结构体的“继承”9.结构体与JSON序列化10.结构体标签(Tag)Go语言中没有“类”的概念,也…...
【python】各种排序算法代码大集合
超级好用的口诀: 时间复杂度:快些以nlogn的速度归队。 稳定性:心情不稳定,快些选一堆好友来聊天吧。 直接插容易插变O(N),起泡起得好变O(N).(初始序列已经有序) 插入排序法在近乎有序的情况下,效率特别高,通过插入排序,可以引申出希尔排序 归并排序:左半部分排好序…...
手机wordpress无法登录/百度提交收录入口
2019独角兽企业重金招聘Python工程师标准>>> 1.Redis简介 Redis 是一个开源(BSD许可)的,内存中的key-value数据结构存储系统,它可以用作数据库、缓存和消息中间件,解决了断电后数据完全丢失的情况。它支持多…...
为什么用MyEclipse做网站/清远疫情防控措施
记事本原本是电脑上的一个记事小工具,后来成了记事工具的代表。其实,手机上的记事工具是种类非常多,除了记事本外,便签、备忘录也是非常常用的记事工具。这些记事工具虽然名字不同,但是功能大同小异。 虽然应用商城里…...
wordpress只显示文章标题摘要/百度云引擎搜索
我们知道SQL Server和Oracle其实很多原理都类似.特别是一些常用的SQL语句都是按照标准来.所以它们也可以有一定的互操作性的.这里讲一下,怎么配置让SQL Server连接一个Oracle.然后你在SQL Server中也能查看Oracle中表的内容. 我先说下我使用的环境: 操作系统: win7 64 ,SQL …...
怎么做一元购物网站/真正免费建站
上篇 SpringBootMongoDB实现一物流订单系统(上) 本文收录在公众号:bigsai第三步 订单更新(追加订单)创建完订单之后,无数配送公司和人员便开始配送物流,而我们在查询的时候,物流状态信息也能够时不时的刷新,具体物流信…...
做网站需要前置审批/百度快速排名软件下载
chatGPT基于所谓的大模型,这里有两个关键词一个是“大”,一个是“模型”,我们先看什么叫“模型”。所谓模型其实就是深度学习中的神经网络,后者由很多个称之为“神经元”基本单元组成。神经元是一种基础计算单元,它执行…...
电商网站页面/营销网站建设选择原则
最近在看 UNIX 网络编程并研究了一下 Redis 的实现,感觉 Redis 的源代码十分适合阅读和分析,其中 I/O 多路复用(mutiplexing)部分的实现非常干净和优雅,在这里想对这部分的内容进行简单的整理。几种 I/O 模型为什么 Redis 中要使用 I/O 多路复…...