当前位置: 首页 > news >正文

RocketMQ消息短暂而又精彩的一生(荣耀典藏版)

 

目录

前言

一、核心概念

二、消息诞生与发送

2.1.路由表

2.2.队列的选择

2.3.其它特殊情况处理

2.3.1.发送异常处理

2.3.2.消息过大的处理

三、消息存储

3.1.如何保证高性能读写

3.1.1.传统IO读写方式

3.2零拷贝

3.2.1.mmap()

3.2.2sendfile()

3.2.3.CommitLog

3.3.刷盘机制

3.3.1.异步刷盘

3.3.2.同步刷盘

四、高可用

4.1.主从同步模式

4.2.Dledger模式

五、消息消费

5.1.消费的两种模式

5.1.1.集群模式

5.1.2.广播模式

5.1.2.1.ConsumeQueue

5.2.RocketMQ如何实现消息的顺序性

六、消息清理

七、消息的一生总结

7.1.消息发送

7.2.消息存储

7.3.高可用

7.4.消息消费

7.5.消息清理


前言

大家好,我是月夜枫~~

这篇文章我准备来聊一聊RocketMQ消息的一生。

RocketMQ是什么呢?

RocketMQ是一个分布式消息中间件,‌专为处理万亿级超大规模的消息而设计。‌

RocketMQ由阿里巴巴在2012年开发,‌旨在提供高吞吐量、‌低延迟、‌海量堆积、‌顺序收发等特点的消息处理服务。‌它支持多种消息类型,‌包括普通消息、‌顺序消息、‌事务消息和定时/延时消息,‌满足微服务与大数据场景的需求。‌RocketMQ 5.0版本进一步发展,‌成为云原生“消息、‌事件、‌流”实时数据处理平台,‌覆盖云边端一体化数据处理场景。‌它支持云原生特性,‌如弹性扩展、‌高吞吐量保证、‌流处理引擎、‌金融级稳定性、‌简化的架构和友好的生态系统。‌此外,‌RocketMQ还支持海量Topic需求,‌通过配置化和低代码的方式进行数据集成,‌可与任意系统建立连接,‌适用于构建流式ETL、‌数据管道、‌数据湖等应用场景。‌

RocketMQ的设计初衷是为了应对阿里巴巴双十一购物狂欢节等大规模互联网业务场景的挑战,‌并在2016年被捐赠给Apache基金会,‌成为Apache的顶级项目。‌它的成功得益于创始团队和众多开发者的努力,‌以及其满足不断丰富的业务场景的功能发布,‌如事务消息、‌SQL过滤、‌轨迹追踪等。‌RocketMQ的开源版本为更多开发者提供了服务,‌帮助企业构建现代应用,‌体验大规模的云计算实践。

不知你是否跟我一样,在使用RocketMQ的时候也有很多的疑惑:

  • 消息是如何发送的,队列是如何选择的?

  • 消息是如何存储的,是如何保证读写的高性能?

  • RocketMQ是如何实现消息的快速查找的?

  • RocketMQ是如何实现高可用的?

  • 消息是在什么时候会被清除?

  • ...

本文就通过探讨上述问题来探秘消息在RocketMQ中短暂而又精彩的一生。

一、核心概念

  • NameServer:可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。

  • Broker:核心的一个角色,主要是用来保存消息的,在启动时会向NameServer进行注册。Broker实例可以有很多个,相同的BrokerName可以称为一个Broker组,每个Broker组只保存一部分消息。

  • topic:可以理解为一个消息的集合的名字,一个topic可以分布在不同的Broker组下。

  • 队列(queue):一个topic可以有很多队列,默认是一个topic在同一个Broker组中是4个。如果一个topic现在在2个Broker组中,那么就有可能有8个队列。

  • 生产者:生产消息的一方就是生产者。

  • 生产者组:一个生产者组可以有很多生产者,只需要在创建生产者的时候指定生产者组,那么这个生产者就在那个生产者组。

  • 消费者:用来消费生产者消息的一方。

  • 消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组可以有很多的消费者,不同的消费者组消费消息是互不影响的。

二、消息诞生与发送

我们都知道,消息是由业务系统在运行过程产生的,当我们的业务系统产生了消息,我们就可以调用RocketMQ提供的API向RocketMQ发送消息,就像下面这样。

DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
//指定NameServer的地址
producer.setNamesrvAddr("localhost:9876");
//启动生产者
producer.start();
//省略代码。。
Message msg = new Message("sanyouTopic", "TagA", "张三的按摩日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息并得到消息的发送结果,然后打印
SendResult sendResult = producer.send(msg);

虽然代码很简单,我们不经意间可能会思考如下问题:

  • 代码中只设置了NameServer的地址,那么生产者是如何知道Broker所在机器的地址,然后向Broker发送消息的?

  • 一个topic会有很多队列,那么生产者是如何选择哪个队列发送消息?

  • 消息一旦发送失败了怎么办?

2.1.路由表

当Broker在启动的过程中,Broker就会往NameServer注册自己这个Broker的信息,这些信息就包括自身所在服务器的ip和端口,还有就是自己这个Broker有哪些topic和对应的队列信息,这些信息就是路由信息,后面就统一称为路由表。

Broker向NameServer注册

当生产者启动的时候,会从NameServer中拉取到路由表,缓存到本地,同时会开启一个定时任务,默认是每隔30s从NameServer中重新拉取路由信息,更新本地缓存。

2.2.队列的选择

好了通过上一节我们就明白了,原来生产者会从NameServer拉取到Broker的路由表的信息,这样生产者就知道了topic对应的队列的信息了。

但是由于一个topic可能会有很多的队列,那么应该将消息发送到哪个队列上呢?

面对这种情况,RocketMQ提供了两种消息队列的选择算法。

  • 轮询算法

  • 最小投递延迟算法

轮询算法 就是一个队列一个队列发送消息,这些就能保证消息能够均匀分布在不同的队列底下,这也是RocketMQ默认的队列选择算法。

但是由于机器性能或者其它情况可能会出现某些Broker上的Queue可能投递延迟较严重,这样就会导致生产者不能及时发消息,造成生产者压力过大的问题。所以RocketMQ提供了最小投递延迟算法。

最小投递延迟算法 每次消息投递的时候会统计投递的时间延迟,在选择队列的时候会优先选择投递延迟时间小的队列。这种算法可能会导致消息分布不均匀的问题。

如果你想启用最小投递延迟算法,只需要按如下方法设置一下即可。

producer.setSendLatencyFaultEnable(true);

当然除了上述两种队列选择算法之外,你也可以自定义队列选择算法,只需要实现MessageQueueSelector接口,在发送消息的时候指定即可。

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {//从mqs中选择一个队列return null;}
}, new Object());

MessageQueueSelector RocketMQ也提供了三种实现

  • 随机算法

  • Hash算法

  • 根据机房选择算法(空实现)

2.3.其它特殊情况处理

2.3.1.发送异常处理

终于,不论是通过RocketMQ默认的队列选择算法也好,又或是自定义队列选择算法也罢,终于选择到了一个队列,那么此时就可以跟这个队列所在的Broker机器建立网络连接,然后通过网络请求将消息发送到Broker上。

但是不幸的事发生了,Broker挂了,又或者是机器负载太高了,发送消息超时了,那么此时RockerMQ就会进行重试。

RockerMQ重试其实很简单,就是重新选择其它Broker机器中的一个队列进行消息发送,默认会重试两次。

当然如果你的机器比较多,可以将设置重试次数设置大点。

producer.setRetryTimesWhenSendFailed(10);
2.3.2.消息过大的处理

一般情况下,消息的内容都不会太大,但是在一些特殊的场景中,消息内容可能会出现很大的情况。

遇到这种消息过大的情况,比如在默认情况下消息大小超过4k的时候,RocketMQ是会对消息进行压缩之后再发送到Broker上,这样在消息发送的时候就可以减少网络资源的占用。

三、消息存储

好了,经过以上环节Broker终于成功接收到了生产者发送的消息了,但是为了能够保证Broker重启之后消息也不丢失,此时就需要将消息持久化到磁盘。

3.1.如何保证高性能读写

由于涉及到消息持久化操作,就涉及到磁盘数据的读写操作,那么如何实现文件的高性能读写呢?这里就不得不提到的一个叫零拷贝的技术。

3.1.1.传统IO读写方式

说零拷贝之前,先说一下传统的IO读写方式。

比如现在需要将磁盘文件通过网络传输出去,那么整个传统的IO读写模型如下图所示:

 

传统的IO读写其实就是read + write的操作,整个过程会分为如下几步

  • 用户调用read()方法,开始读取数据,此时发生一次上下文从用户态到内核态的切换,也就是图示的切换1

  • 将磁盘数据通过DMA拷贝到内核缓存区

  • 将内核缓存区的数据拷贝到用户缓冲区,这样用户,也就是我们写的代码就能拿到文件的数据

  • read()方法返回,此时就会从内核态切换到用户态,也就是图示的切换2

  • 当我们拿到数据之后,就可以调用write()方法,此时上下文会从用户态切换到内核态,即图示切换3

  • CPU将用户缓冲区的数据拷贝到Socket缓冲区

  • 将Socket缓冲区数据拷贝至网卡

  • write()方法返回,上下文重新从内核态切换到用户态,即图示切换4

整个过程发生了4次上下文切换和4次数据的拷贝,这在高并发场景下肯定会严重影响读写性能。

所以为了减少上下文切换次数和数据拷贝次数,就引入了零拷贝技术。

3.2零拷贝

零拷贝技术是一个思想,指的是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。

实现零拷贝的有以下几种方式

  • mmap()

  • sendfile()

3.2.1.mmap()

mmap(memory map)是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。

简单地说就是内核缓冲区和应用缓冲区共享,从而减少了从读缓冲区到用户缓冲区的一次CPU拷贝。

比如基于mmap,上述的IO读写模型就可以变成这样。

基于mmap IO读写其实就变成mmap + write的操作,也就是用mmap替代传统IO中的read操作。

当用户发起mmap调用的时候会发生上下文切换1,进行内存映射,然后数据被拷贝到内核缓冲区,mmap返回,发生上下文切换2;随后用户调用write,发生上下文切换3,将内核缓冲区的数据拷贝到Socket缓冲区,write返回,发生上下文切换4。

整个过程相比于传统IO主要是不用将内核缓冲区的数据拷贝到用户缓冲区,而是直接将数据拷贝到Socket缓冲区。上下文切换的次数仍然是4次,但是拷贝次数只有3次,少了一次CPU拷贝。

在Java中,提供了相应的api可以实现mmap,当然底层也还是调用Linux系统的mmap()实现的

FileChannel fileChannel = new RandomAccessFile("test.txt", "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());

如上代码拿到MappedByteBuffer,之后就可以基于MappedByteBuffer去读写。

3.2.2sendfile()

sendfile()跟mmap()一样,也会减少一次CPU拷贝,但是它同时也会减少两次上下文切换。

如图,用户在发起sendfile()调用时会发生切换1,之后数据通过DMA拷贝到内核缓冲区,之后再将内核缓冲区的数据CPU拷贝到Socket缓冲区,最后拷贝到网卡,sendfile()返回,发生切换2。

同样地,Java也提供了相应的api,底层还是操作系统的sendfile()

FileChannel channel = FileChannel.open(Paths.get("./test.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
//调用transferTo方法向目标数据传输
channel.transferTo(position, len, target);

通过FileChannel的transferTo方法即可实现。

transferTo方法(sendfile)主要是用于文件传输,比如将文件传输到另一个文件,又或者是网络。

在如上代码中,并没有文件的读写操作,而是直接将文件的数据传输到target目标缓冲区,也就是说,sendfile是无法知道文件的具体的数据的;但是mmap不一样,他是可以修改内核缓冲区的数据的。假设如果需要对文件的内容进行修改之后再传输,只有mmap可以满足。

通过上面的一些介绍,主要就是一个结论,那就是基于零拷贝技术,可以减少CPU的拷贝次数和上下文切换次数,从而可以实现文件高效的读写操作。

RocketMQ内部主要是使用基于mmap实现的零拷贝(其实就是调用上述提到的api),用来读写文件,这也是RocketMQ为什么快的一个很重要原因。

RocketMQ中使用mmap代码
3.2.3.CommitLog

前面提到消息需要持久化到磁盘文件中,而CommitLog其实就是存储消息的文件的一个称呼,所有的消息都存在CommitLog中,一个Broker实例只有一个CommitLog。

由于消息数据可能会很大,同时兼顾内存映射的效率,不可能将所有消息都写到同一个文件中,所以CommitLog在物理磁盘文件上被分为多个磁盘文件,每个文件默认的固定大小是1G。

当生产者将消息发送过来的时候,就会将消息按照顺序写到文件中,当文件空间不足时,就会重新建一个新的文件,消息写到新的文件中。

消息在写入到文件时,不仅仅会包含消息本身的数据,也会包含其它的对消息进行描述的数据,比如这个消息来自哪台机器、消息是哪个topic的、消息的长度等等,这些数据会和消息本身按照一定的顺序同时写到文件中,所以图示的消息其实是包含消息的描述信息的。

3.3.刷盘机制

RocketMQ在将消息写到CommitLog文件中时并不是直接就写到文件中,而是先写到PageCache,也就是前面说的内核缓存区,所以RocketMQ提供了两种刷盘机制,来将内核缓存区的数据刷到磁盘。

3.3.1.异步刷盘

异步刷盘就是指Broker将消息写到PageCache的时候,就直接返回给生产者说消息存储成功了,然后通过另一个后台线程来将消息刷到磁盘,这个后台线程是在RokcetMQ启动的时候就会开启。异步刷盘方式也是RocketMQ默认的刷盘方式。

其实RocketMQ的异步刷盘也有两种不同的方式,一种是固定时间,默认是每隔0.5s就会刷一次盘;另一种就是频率会快点,就是每存一次消息就会通知去刷盘,但不会去等待刷盘的结果,同时如果0.5s内没被通知去刷盘,也会主动去刷一次盘。默认的是第一种固定时间的方式。

3.3.2.同步刷盘

同步刷盘就是指Broker将消息写到PageCache的时候,会等待异步线程将消息成功刷到磁盘之后再返回给生产者说消息存储成功。

同步刷盘相对于异步刷盘来说消息的可靠性更高,因为异步刷盘可能出现消息并没有成功刷到磁盘时,机器就宕机的情况,此时消息就丢了;但是同步刷盘需要等待消息刷到磁盘,那么相比异步刷盘吞吐量会降低。所以同步刷盘适合那种对数据可靠性要求高的场景。

如果你需要使用同步刷盘机制,只需要在配置文件指定一下刷盘机制即可。

四、高可用

在说高可用之前,先来完善一下前面的一些概念。

在前面介绍概念的时候也说过,一个RokcetMQ中可以有很多个Broker实例,相同的BrokerName称为一个组,同一个Broker组下每个Broker实例保存的消息是一样的,不同的Broker组保存的消息是不一样的。

 

如图所示,两个BrokerA实例组成了一个Broker组,两个BrokerB实例也组成了一个Broker组。

前面说过,每个Broker实例都有一个CommitLog文件来存储消息的。那么两个BrokerA实例他们CommitLog文件存储的消息是一样的,两个BrokerB实例他们CommitLog文件存储的消息也是一样的。

那么BrokerA和BrokerB存的消息不一样是什么意思呢?

其实很容易理解,假设现在有个topicA存在BrokerA和BrokerB上,那么topicA在BrokerA和BrokerB默认都会有4个队列。

前面在说发消息的时候需要选择一个队列进行消息的发送,假设第一次选择了BrokerA上的队列发送消息,那么此时这条消息就存在BrokerA上,假设第二次选择了BrokerB上的队列发送消息,那么那么此时这条消息就存在BrokerB上,所以说BrokerA和BrokerB存的消息是不一样的。

那么为什么同一个Broker组内的Broker存储的消息是一样的呢?其实比较容易猜到,就是为了保证Broker的高可用,这样就算Broker组中的某个Broker挂了,这个Broker组依然可以对外提供服务。

那么如何实现同Broker组的Broker存的消息数据相同的呢?这就不得不提到Broker的高可用模式。

RocketMQ提供了两种Broker的高可用模式:

  • 主从同步模式

  • Dledger模式

4.1.主从同步模式

在主从同步模式下,在启动的时候需要在配置文件中指定BrokerId,在同一个Broker组中,BrokerId为0的是主节点(master),其余为从节点(slave)。

当生产者将消息写入到主节点是,主节点会将消息内容同步到从节点机器上,这样一旦主节点宕机,从节点机器依然可以提供服务。

主从同步主要同步两部分数据

  • topic等数据

  • 消息

topic等数据是从节点每隔10s钟主动去主节点拉取,然后更新本身缓存的数据。

消息是主节点主动推送到从节点的。当主节点收到消息之后,会将消息通过两者之间建立的网络连接发送出去,从节点接收到消息之后,写到CommitLog即可。

从节点有两种方式知道主节点所在服务器的地址,第一种就是在配置文件指定;第二种就是从节点在注册到NameServer的时候会返回主节点的地址。

主从同步模式有一个比较严重的问题就是如果集群中的主节点挂了,这时需要人为进行干预,手动进行重启或者切换操作,而非集群自己从从节点中选择一个节点升级为主节点。

为了解决上述的问题,所以RocketMQ在4.5.0就引入了Dledger模式。

4.2.Dledger模式

在Dledger模式下的集群会基于Raft协议选出一个节点作为leader节点,当leader节点挂了后,会从follower中自动选出一个节点升级成为leader节点。所以Dledger模式解决了主从模式下无法自动选择主节点的问题。

在Dledger集群中,leader节点负责写入消息,当消息写入leader节点之后,leader会将消息同步到follower节点,当集群中过半数(节点数/2 +1)节点都成功写入了消息,这条消息才算真正写成功。

至于选举的细节,这里就不多说了,有兴趣的可以自行谷歌,还是挺有意思的。

五、消息消费

终于,在生产者成功发送消息到Broker,Broker在成功存储消息之后,消费者要消费消息了。

消费者在启动的时候会从NameSrever拉取消费者订阅的topic的路由信息,这样就知道订阅的topic有哪些queue,以及queue所在Broker的地址信息。

为什么消费者需要知道topic对应的哪些queue呢?

其实主要是因为消费者在消费消息的时候是以队列为消费单元的,消费者需要告诉Broker拉取的是哪个队列的消息,至于如何拉到消息的,后面再说。

5.1.消费的两种模式

前面说过,消费者是有个消费者组的概念,在启动消费者的时候会指定该消费者属于哪个消费者组。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");

一个消费者组中可以有多个消费者,不同消费者组之间消费消息是互不干扰的。

在同一个消费者组中,消息消费有两种模式。

  • 集群模式

  • 广播模式

5.1.1.集群模式

同一条消息只能被同一个消费组下的一个消费者消费,也就是说,同一条消息在同一个消费者组底下只会被消费一次,这就叫集群消费。

集群消费的实现就是将队列按照一定的算法分配给消费者,默认是按照平均分配的。

如图所示,将每个队列分配只分配给同一个消费者组中的一个消费者,这样消息就只会被一个消费者消费,从而实现了集群消费的效果。

RocketMQ默认是集群消费的模式。

5.1.2.广播模式

广播模式就是同一条消息可以被同一个消费者组下的所有消费者消费。

其实实现也很简单,就是将所有队列分配给每个消费者,这样每个消费者都能读取topic底下所有的队列的数据,就实现了广播模式。

如果你想使用广播模式,只需要在代码中指定即可。

consumer.setMessageModel(MessageModel.BROADCASTING);
5.1.2.1.ConsumeQueue

上一节我们提到消费者是从队列中拉取消息的,但是这里不经就有一个疑问,那就是消息明明都存在CommitLog文件中的,那么是如何去队列中拉的呢?难道是去遍历所有的文件,找到对应队列的消息进行消费么?

答案是否定的,因为这种每次都遍历数据的效率会很低,所以为了解决这种问题,引入了ConsumeQueue的这个概念,而消费实际是从ConsumeQueue中拉取数据的。

用户在创建topic的时候,Broker会为topic创建队列,并且每个队列其实会有一个编号queueId,每个队列都会对应一个ConsumeQueue,比如说一个topic在某个Broker上有4个队列,那么就有4个ConsumeQueue。

前面说过,消息在发送的时候,会根据一定的算法选择一个队列,之后再发送消息的时候会携带选择队列的queueId,这样Broker就知道消息属于哪个队列的了。当消息被存到CommitLog之后,其实还会往这条消息所在的队列的ConsumeQueue插一条数据。

ConsumeQueue也是由多个文件组成,每个文件默认是存30万条数据。

插入ConsumeQueue中的每条数据由20个字节组成,包含3部分信息,消息在CommitLog的起始位置(8个字节),消息在CommitLog存储的长度(8个字节),还有就是tag的hashCode(4个字节)。

所以当消费者从Broker拉取消息的时候,会告诉Broker拉取哪个队列(queueId)的消息、这个队列的哪个位置的消息(queueOffset)。

queueOffset就是指上图中ConsumeQueue一条数据的编号,单调递增的。

Broker在接受到消息的时候,找个指定队列的ConsumeQueue,由于每条数据固定是20个字节,所以可以轻易地计算出queueOffset对应的那条数据在哪个文件的哪个位置上,然后读出20个字节,从这20个字节中在解析出消息在CommitLog的起始位置和存储的长度,之后再到CommitLog中去查找,这样就找到了消息,然后在进行一些处理操作返回给消费者。

到这,我们就清楚的知道消费者是如何从队列中拉取消息的了,其实就是先从这个队列对应的ConsumeQueue中找到消息所在CommmitLog中的位置,然后再从CommmitLog中读取消息的。

5.2.RocketMQ如何实现消息的顺序性

这里插入一个比较常见的一个面试,那么如何保证保证消息的顺序性。

其实要想保证消息的顺序只要保证以下三点即可

  • 生产者将需要保证顺序的消息发送到同一个队列

  • 消息队列在存储消息的时候按照顺序存储

  • 消费者按照顺序消费消息

第一点如何保证生产者将消息发送到同一个队列?

上文提到过RocketMQ生产者在发送消息的时候需要选择一个队列,并且选择算法是可以自定义的,这样我们只需要在根据业务需要,自定义队列选择算法,将顺序消息都指定到同一个队列,在发送消息的时候指定该算法,这样就实现了生产者发送消息的顺序性。

第二点,RocketMQ在存消息的时候,是按照顺序保存消息在ConsumeQueue中的位置的,由于消费消息的时候是先从ConsumeQueue查找消息的位置,这样也就保证了消息存储的顺序性。

第三点消费者按照顺序消费消息,这个RocketMQ已经实现了,只需要在消费消息的时候指定按照顺序消息消费即可,如下面所示,注册消息的监听器的时候使用MessageListenerOrderly这个接口的实现。

consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {//按照顺序消费消息记录return null;}
});

六、消息清理

由于消息是存磁盘的,但是磁盘空间是有限的,所以对于磁盘上的消息是需要清理的。

当出现以下几种情况下时就会触发消息清理:

  • 手动执行删除

  • 默认每天凌晨4点会自动清理过期的文件

  • 当磁盘空间占用率默认达到75%之后,会自动清理过期文件

  • 当磁盘空间占用率默认达到85%之后,无论这个文件是否过期,都会被清理掉

上述过期的文件是指文件最后一次修改的时间超过72小时(默认情况下),当然如果你的老板非常有钱,服务器的磁盘空间非常大,可以将这个过期时间修改的更长一点。

有的小伙伴肯定会有疑问,如果消息没有被消息,那么会被清理么?

答案是会被清理的,因为清理消息是直接删除CommitLog文件,所以只要达到上面的条件就会直接删除CommitLog文件,无论文件内的消息是否被消费过。

当消息被清理完之后,消息也就结束了它精彩的一生。

七、消息的一生总结

为了更好地理解本文,这里再来总结一下RokcetMQ消息一生的各个环节。

7.1.消息发送

  • 生产者产生消息

  • 生产者在发送消息之前会拉取topic的路由信息

  • 根据队列选择算法,从topic众多的队列中选择一个队列

  • 跟队列所在的Broker机器建立网络连接,将消息发送到Broker上

7.2.消息存储

  • Broker接收到生产者的消息将消息存到CommitLog中

  • 在CosumeQueue中存储这条消息在CommitLog中的位置

由于CommitLog和CosumeQueue都涉及到磁盘文件的读写操作,为了提高读写效率,RokcetMQ使用到了零拷贝技术,其实就是调用了一下Java提供的api。。

7.3.高可用

如果是集群模式,那么消息会被同步到从节点,从节点会将消息存到自己的CommitLog文件中。这样就算主节点挂了,从节点仍然可以对外提供访问。

7.4.消息消费

  • 消费者会拉取订阅的Topic的路由信息,根据集群消费或者广播消费的模式来选择需要拉取消息的队列

  • 与队列所在的机器建立连接,向Broker发送拉取消息的请求

  • Broker在接收到请求知道,找到队列对应的ConsumeQueue,然后计算出拉取消息的位置,再解析出消息在CommitLog中的位置

  • 根据解析出的位置,从CommitLog中读出消息的内容返回给消费者

7.5.消息清理

由于消息是存在磁盘的,而磁盘的空间是有限的,所以RocketMQ会根据一些条件去清理CommitLog文件。

最后说一句(求关注,别白嫖我)
如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下,您的支持是我坚持写作最大的动力。
求一键三连:点赞、转发、在看。
我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。

相关文章:

RocketMQ消息短暂而又精彩的一生(荣耀典藏版)

目录 前言 一、核心概念 二、消息诞生与发送 2.1.路由表 2.2.队列的选择 2.3.其它特殊情况处理 2.3.1.发送异常处理 2.3.2.消息过大的处理 三、消息存储 3.1.如何保证高性能读写 3.1.1.传统IO读写方式 3.2零拷贝 3.2.1.mmap() 3.2.2sendfile() 3.2.3.CommitLog …...

Linux中的文件操作

linux中exec*为加载器&#xff0c;可以将程序加载到内存。 main()函数也是函数&#xff0c;也要被调用&#xff0c;也要被传参 故在一个程序中exec*系列的函数先被执行 程序替换中execve是系统调用&#xff0c;其他的都是封装。 进程程序替换 1.创建子进程的目的&#xff1…...

[排序]hoare快速排序

今天我们继续来讲排序部分&#xff0c;顾名思义&#xff0c;快速排序是一种特别高效的排序方法&#xff0c;在C语言中qsort函数&#xff0c;底层便是用快排所实现的&#xff0c;快排适用于各个项目中&#xff0c;特别的实用&#xff0c;下面我们就由浅入深的全面刨析快速排序。…...

freertos的学习cubemx版

HAL 库的freertos 1 实时 2 任务->线程 3 移植 CMSIS_V2 V1版本 NVIC配置全部是抢占优先级 第四组 抢占级别有 0-15 编码规则&#xff0c; 变量名 &#xff1a;类型前缀&#xff0c; c - char S - int16_t L - int32_t U - unsigned Uc - uint8_t Us - uint…...

PyQt 信号与槽功能

PyQt 信号与槽功能 基本概念&#xff1a;在 PyQt 中&#xff0c;信号&#xff08;Signal&#xff09;与槽&#xff08;Slot&#xff09;是一种用于对象之间通信的机制。信号可以由一个对象发出&#xff0c;而槽是用于接收信号并执行相应操作的函数。 信号 信号是在 PyQt 的类…...

navicat premium安装和破解

https://blog.csdn.net/qq1031893936/article/details/90264688 提示信息 - 吾爱破解 - LCG - LSG |安卓破解|病毒分析|www.52pojie.cn...

OSI七层模型

OSI&#xff08;Open System Interconnect&#xff09;&#xff0c;即开放式系统互连。 该体系结构标准定义了网络互连的七层框架&#xff08;物理层、数据链路层、网络层、传输层、会话层、表示层和应用层 &#xff09;&#xff0c;即OSI开放系统互连参考模型。 应用层 为用…...

Qt自定义MessageToast

效果&#xff1a; 文字长度自适应&#xff0c;自动居中到parent&#xff0c;会透明渐变消失。 CustomToast::MessageToast(QS("最多添加50张图片"),this);1. CustomToast.h #pragma once#include <QFrame>class CustomToast : public QFrame {Q_OBJECT pub…...

自动化测试 pytest 中 scope 限制 fixture使用范围!

导读 fixture 是 pytest 中一个非常重要的模块&#xff0c;可以让代码更加简洁。 fixture 的 autouse 为 True 可以自动化加载 fixture。 如果不想每条用例执行前都运行初始化方法(可能多个fixture)怎么办&#xff1f;可不可以只运行一次初始化方法&#xff1f; 答&#xf…...

软件-vscode-plantUML-drawio

文章目录 vscode基础命令 实操1. vscode实现springboot项目搭建 &#xff08;包括spring data jpa和sqlLite连接&#xff09; PlantUMLDrawio基础实操 vscode 基础 命令 启动mysql命令 docker run --name mysql-container -e MYSQL_ROOT_PASSWORD123456 -p 3306:3306 -d my…...

Python爬虫实战案例(爬取图片)

爬取图片的信息 爬取图片与爬取文本内容相似&#xff0c;只是需要加上图片的url&#xff0c;并且在查找图片位置的时候需要带上图片的属性。 这里选取了一个4K高清的壁纸网站&#xff08;彼岸壁纸https://pic.netbian.com&#xff09;进行爬取。 具体步骤如下&#xff1a; …...

智慧工地视频汇聚管理平台:打造现代化工程管理的全新视界

一、方案背景 科技高速发展的今天&#xff0c;工地施工已发生翻天覆地的变化&#xff0c;传统工地管理模式很容易造成工地管理混乱、安全事故、数据延迟等问题&#xff0c;人力资源的不足也进一步加剧了监管不到位的局面&#xff0c;严重影响了施工进度质量和安全。 视频监控…...

ASP.NET中的六大对象有哪些?以及各自的功能以及使用方式

在ASP.NET Web Forms中&#xff0c;并没有严格意义上的“六大对象”&#xff0c;但通常我们指的是与HTTP请求和响应处理紧密相关的几个内置对象。以下是这些对象及其功能、使用方式以及简单的实现源码示例&#xff1a; Response对象 功能&#xff1a;用于向客户端发送HTTP响应…...

Elastic 及阿里云 AI 搜索 Tech Day 将于 7 月 27 日在上海举办

活动主题 面向开发者的 AI 搜索相关技术分享&#xff0c;如 RAG、多模态搜索、向量检索等。 活动介绍 参加 Elastic 原厂与阿里云联合举办的 Generative AI 技术交流分享日。借助 The Elastic Search AI Platform&#xff0c; 使用开放且灵活的企业解决方案&#xff0c;以前所…...

基于ssm+vue医院住院管理系统源码数据库

摘 要 随着时代的发展&#xff0c;医疗设备愈来愈完善&#xff0c;医院也变成人们生活中必不可少的场所。如今&#xff0c;已经2021年了&#xff0c;虽然医院的数量和设备愈加完善&#xff0c;但是老龄人口也越来越多。在如此大的人口压力下&#xff0c;医院住院就变成了一个…...

【在排序数组中查找元素的第一个和最后一个位置】python刷题记录

R2-分治 有点easy的感觉&#xff0c;感觉能用哈希表 class Solution:def searchRange(self, nums: List[int], target: int) -> List[int]:nlen(nums)dictdefaultdict(list)#初始赋值哈希表&#xff0c;记录出现次数for num in nums:if not dict[num]:dict[num]1else:dict[…...

Pytorch基础:Tensor的squeeze和unsqueeze方法

相关阅读 Pytorch基础https://blog.csdn.net/weixin_45791458/category_12457644.html?spm1001.2014.3001.5482 在Pytorch中&#xff0c;squeeze和unsqueeze是Tensor的一个重要方法&#xff0c;同时它们也是torch模块中的一个函数&#xff0c;它们的语法如下所示。 Tensor.…...

PHP压缩打包,下载目录或者文件,解压zip文件

函数 /*** 压缩整个文件夹为zip文件* 本地需要绝对路径&#xff0c;服务器需要相对路径*/function makeZipFile($zip_path , $folder_path ) {$rootPath realpath($folder_path);$zip new ZipArchive(); // $zip->open($zip_path, ZipArchive::CREATE | ZipArchi…...

后端面试题日常练-day08 【Java基础】

题目 希望这些选择题能够帮助您进行后端面试的准备&#xff0c;答案在文末 Java中的静态变量和实例变量有何区别&#xff1f; a) 静态变量属于类&#xff0c;实例变量属于对象 b) 静态变量只能在静态方法中访问&#xff0c;实例变量只能在实例方法中访问 c) 静态变量在类加载时…...

Linux:core文件无法生成排查步骤

1、进程的RLIMIT_CORE或RLIMIT_SIZE被设置为0。使用getrlimit和ulimit检查修改。 使用ulimit -a 命令检查是否开启core文件生成限制 如果发现-c后面的结果是0&#xff0c;就临时添加环境变量ulimit -c unlimited&#xff0c;之后在启动程序观察是否有core生成&#xff0c;如果…...

大模型学习资源

上一篇扯了一堆废话&#xff0c;关于大模型&#xff0c;提供一下建议 说实话&#xff0c;大模型更新太快&#xff0c;以我30岁的高龄实在不适合再去研究技术。偶然发现&#xff0c;国内的大模型厂家在做推广的培训。比如上海人工智能实验室&#xff0c;阿里&#xff0c;百度。…...

约定(模拟赛2 T3)

题目描述 小A在你的帮助下成功打开了山洞中的机关&#xff0c;虽然他并没有找到五维空间&#xff0c;但他在山洞中发现了无尽的宝藏&#xff0c;这个消息很快就传了出去。人们为了争夺洞中的宝藏相互陷害&#xff0c;甚至引发了战争&#xff0c;世界都快要毁灭了。小A非常地难…...

Java推送xml数据进行http请求

将json转成xml数据进行推送&#xff0c;打印出最终推送xml的数据格式&#xff0c;再调整代码 直接上代码&#xff0c;详情请看代码注释 public void pushReceipt(JSONObject jsonObj) {try {// 创建 XML 文档Document doc createXmlDocument();// 构建 XML 结构Element rootE…...

Docker安装 OpenResty详细教程

OpenResty 是一个基于 Nginx 的高性能 Web 平台&#xff0c;它集成了 Lua 脚本语言&#xff0c;使得开发者可以在 Nginx 服务器上轻松地进行动态 Web 应用开发。OpenResty 的核心目标是通过将 Nginx 的高性能与 Lua 的灵活性结合起来&#xff0c;提供一个强大且高效的 Web 开发…...

前端位运算运用场景小知识(权限相关)

前提&#xff1a;此篇结合AI、公司实际业务产出&#xff0c;背景是公司有个业务涉及权限&#xff0c;用位运算来控制的&#xff0c;比较新奇&#xff0c;所以记录一下(可能自己比较low) 前端js位运算一般实际的应用场景在哪 ai回答&#xff1a; 整数运算与性能优化&#xff…...

【云原生】Kubernetes中的DaemonSet介绍、原理、用法及实战应用案例分析

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…...

使用框架构建React Native应用程序的最佳实践

在React Conf上&#xff0c;我们更新了关于开始构建React Native应用程序的最佳工具的指导&#xff1a;一个React Native框架——一个包含所有必要API的工具箱&#xff0c;让您能够构建生产就绪的应用程序。 现在推荐使用React Native框架&#xff08;如Expo&#xff09;来创建…...

Godot入门 02玩家1.0版

添加Node2D节点&#xff0c;重命名Game 创建玩家场景&#xff0c;添加CharacterBody2D节点 添加AnimatedSprite2D节点 从精灵表中添加帧 选择文件 设置成8*8 图片边缘模糊改为清晰 设置加载后自动播放&#xff0c;动画循环 。动画速度10FPS&#xff0c;修改动画名称idle。 拖动…...

Docker-Compose配置zookeeper+KaFka+CMAK简单集群

1. 本地DNS解析管理 # 编辑hosts文件 sudo nano /etc/hosts # 添加以下三个主机IP 192.168.186.77 zoo1 k1 192.168.186.18 zoo2 k2 192.168.186.216 zoo3 k3注&#xff1a;zoo1是192.168.186.77的别名&#xff0c;zoo2是192.168.186.18的别名&#xff0c;zoo3是192.168.186.1…...

Python中,集合几种基本运算

在Python中&#xff0c;集合具有几种基本的集合运算&#xff0c;这些运算可以用于处理集合中的数据。以下是Python集合的常见运算&#xff0c;包括并集、交集、差集和对称差集等&#xff0c;并提供代码示例来显示其用法。 并集 (Union) 并集是两个集合中所有唯一元素的结合&a…...

netsuite查询货品库存

//单品可用数量获取var inventorySearch search.create({type: inventoryitem,filters: [[internalid, is, lineItem2.nsSkuId] // 根据 SKU ID 进行筛选],columns: [search.createColumn({name: locationquantityavailable,summary: SUM}) // 获取可用库存总和]});var result…...

Java 实现分页的几种方式详解

目录 分页概述Java实现分页的几种方式 手动分页基于JDBC的分页基于Hibernate的分页基于MyBatis的分页[基于Spring Data JPA的分页](#基于Spring Data JPA的分页)使用PageHelper插件的分页 分页中的注意事项总结 分页概述 分页是指将大量数据分成若干小块&#xff0c;每次只显…...

vite构建vue3项目hmr生效问题踩坑记录

vite构建vue3项目hmr生效问题踩坑记录 hmr的好处 以下是以表格形式呈现的前端开发中HMR&#xff08;热模块替换&#xff09;带来的好处&#xff1a; 好处描述提升开发效率允许开发者在不刷新整个页面的情况下实时更新修改的代码&#xff0c;减少等待时间保持应用状态在模块替…...

区块链赋能民生大数据

区块链技术作为一种新兴的信息技术&#xff0c;其在民生大数据领域的应用正逐渐展现出巨大的潜力和价值。以下是对区块链赋能民生大数据的详细阐述&#xff1a; 一、区块链技术概述 区块链是一种去中心化、分布式账本技术&#xff0c;具有数据不可篡改、可追溯、公开透明等特…...

10 Vue 特性要点

Vue2 特性要点 Vue2 源码理解 Vue 双向数据绑定 先从单向绑定切入单向绑定非常简单,就是把Mode1绑定到view,当我们用Javascript代码更新Model时, view就会自动更新 双向绑定就很容易联想到了,在单向绑定的基础上,用户更新了View, Mode1的数据也自动被更新了 因为 Vue 是数据双向…...

ESP32和mDNS学习

目录 mDNS的作用mDNS涉及到的标准文件组播地址IPv4 多播地址IPv6 多播地址预先定义好的组播地址 mDNS调试工具例程mDNS如何开发和使用注册服务查询服务 mDNS的作用 mDNS 是一种组播 UDP 服务&#xff0c;用来提供本地网络服务和主机发现。 你要和设备通信&#xff0c;需要记住…...

学习SQL如何使用CASE语句查询分析设备状态

学习SQL如何使用CASE语句查询分析设备状态 一、前言1. 问题背景2. SQL查询分析3. SQL查询解析 二、结论 一、前言 在实际应用中&#xff0c;经常需要对设备的状态进行监控和分析。通过SQL查询&#xff0c;我们可以有效地从数据库中提取和计算设备的状态信息。本文将介绍如何编…...

Gartner发布2024年零信任网络技术成熟度曲线:20项零信任相关的前沿和趋势性技术

大多数组织都制定了零信任信息安全策略&#xff0c;而网络是零信任实施领域的顶级技术。此技术成熟度曲线可以帮助安全和风险管理领导者确定合适的技术&#xff0c;以将零信任原则嵌入其网络中。 战略规划假设 到 2026 年&#xff0c;15% 的企业将在企业拥有的局域网上用 ZTNA …...

React hook 之 useState

在组件的顶部定义状态变量&#xff0c;并传入初始值&#xff0c;确保当这些状态变量的值发生变化时&#xff0c;页面会重新渲染。 const [something,setSomething] useState(initialState); useState 返回一个由两个值组成的数组&#xff1a;1、当前的 state&#xff0c;在首次…...

jenkins中shell脚本中使用构建参数化Groovy变量的四种方式

jenkins中shell脚本中使用构建参数化Groovy变量的四种方式: 以字符变量为例&#xff1a; 流水线代码&#xff1a; pipeline {agent {//label "${server}"label "${28}"}stages {stage(Hello) {steps {echo Hello Worldecho "${28}"echo "…...

Robot Operating System——ParameterEventHandler监控Parameters的增删改行为

大纲 创建订阅"/parameter_events"的Node监控自身Node内部Parameter监控自身Node外部Parameter监听所有Node的所有Parameter的变动执行效果总结 在《Robot Operating System——AsyncParametersClient监控Parameters的增删改行为》一文中&#xff0c;我们通过AsyncPa…...

计算机网络(Wrong Question)

一、计算机网络体系结构 1.1 计算机网络概述 D 注&#xff1a;计算机的三大主要功能是数据通信、资源共享、分布式处理。&#xff08;负载均衡、提高可靠性&#xff09; 注&#xff1a;几段链路就是几段流水。 C 注&#xff1a;记住一个基本计算公式&#xff1a;若n个分组&a…...

Docker+consul容器服务的更新与发现

1、Consul概述 &#xff08;1&#xff09;什么是服务注册与发现 服务注册与发现是微服务架构中不可或缺的重要组件。起初服务都是单节点的&#xff0c;不保障高可用性&#xff0c;也不考虑服务的压力承载&#xff0c;服务之间调用单纯的通过接口访问。直到后来出现了多个节点…...

全网最详细!! Linux 安装、配置教程

一、下载安装包 首先去官网下载VMware最新版本&#xff0c;以及发行版CentOS -7&#xff0c;懒得下载的可以私信我&#xff0c;我给你发包 其中&#xff0c;CentOS&#xff08;Community Enterprise Operating System&#xff09;是一个基于Linux的开源操作系统&#xff0c;它是…...

cocos creator 3学习记录01——如何替换图片

一、动态加载本地图片 1、通过将图片关联到CCClass属性上来进行代码切换。 1、这种方法&#xff0c;需要提前在脚本文件中声明好代表图片的CCClass属性。 2、然后拖动图片资源&#xff0c;到脚本内声明好的属性上以进行关联。 3、然后通过程序&#xff0c;来进行切换展示。…...

【Android Compose】ListView效果

【Android Compose】ListView效果 1、Column、Row 和 Box2、LazyColumn和LazyRow3、Compose 中的状态4、ListView效果5、android-compose-codelabs Jetpack Compose 使用入门 Jetpack Compose 教程 Jetpack Compose 1、Column、Row 和 Box Compose 中的三个基本标准布局元素是 …...

【Pytorch实战教程】Pytorch中.detach()的详细介绍

detach() 是 PyTorch 中用于分离张量的计算图的一个方法。它在处理计算图时非常有用,尤其是在需要停止梯度传播的情况下。以下是 detach() 方法的详细介绍: 方法概述 detach() 方法返回一个新的张量,从当前计算图中分离出来,即返回的张量不会参与梯度计算。这在某些情况下…...

AR 眼镜之-充电动画定制-实现方案

目录 &#x1f4c2; 前言 AR 眼镜系统版本 充电动画 1. &#x1f531; 技术方案 1.1 方案介绍 1.2 实现方案 关机充电动画 亮屏/锁屏充电动画 2. &#x1f4a0; 关机充电动画 2.1 关机充电动画核心处理类与路径 2.2 实现细节 步骤一&#xff1a;1&#xff09;定制 …...

AJAX-XMLHttpRequest 详解

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 前言 XMLHttpRequest 概述 主要用途 工作流程 示例代码 GET 请求示例 POST 请求示例 注意事项 工作…...

内容管理系统 Contentful 与 Baklib

对于希望管理其产品和服务的在线文档或知识库以支持其客户和员工的组织来说&#xff0c;市场上有太多的平台和工具。 遵循的做法之一是使用无头内容管理系统 (CMS)。 如果您是这样的组织之一&#xff0c;正在考虑使用无头 CMS - Contentful 之一来管理您的在线知识库&#xff0…...