当前位置: 首页 > news >正文

仿RabitMQ 模拟实现消息队列项目开发文档2(个人项目)

项目需求分析

核心概念

现在需要将这个项目梳理清楚了,便于之后的代码实现。项目中具有一个生产消费模型:

其中生产者和消费者的个数是可以灵活改变的,让系统资源更加合理的分配。消息队列的主逻辑和上面的逻辑基本一样,只不过我现在要做的这个生产者不再是本地的一个线程了,而是一个客户端,消费者也是一个客户端,生产者就是消息发布客户端,而消费者就是消息订阅客户端,而中间的就不是线程安全的阻塞队列了,而是一个消息队列服务器:

消息队列服务器能够存储消息,消息发布客户端能够将消息发送到客户端上,而消息订阅客户端订阅了消息,消息服务器能够将消息推送给消息订阅客户端。这三个也是我要实现的三个部分。了解了框架下面来了解细节的信息:

AMQP(Advanced Message Queuing Protocol-⾼级消息队列协议(⼀个提供统⼀消息服务的应⽤层标准⾼级消息队列协议,为⾯向消息的中间件设计,使得遵从该规范的客⼾端应⽤和消息中间件服务器的全功能互操作成为可能)模型中,也就是消息中间件服务器中,⼜存在以下概念:

• 虚拟机 (VirtualHost): 类似于 MySQL 的 "database", 是⼀个逻辑上的集合。⼀个 消息队列服务器上可以存在多个 VirtualHost

交换机 (Exchange): ⽣产者把消息先发送到 消息服务器的 Exchange 上,再根据不同的规则, 把消息转发给不同的 Queue

队列 (Queue): 真正⽤来存储消息的部分, 每个消费者决定⾃⼰从哪个 Queue 上读取消息

绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 "多对多" 关系,使⽤⼀个关联表就可以把这两个概念联系起来

消息 (Message): 传递的内容 所谓的 Exchange 和 Queue 可以理解成 "多对多" 关系, 和数据库中的 "多对多" ⼀样. 意思是:

⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息)

⼀个 Queue 也可以被多个 Exchange 绑定 (⼀个 Queue 中的消息可以来⾃于多个 Exchange)

对于虚拟机的说明后面设计的时候会有更加详细的说明,这里能理解就理解。

下面说明一下队列,假设这是一个新闻发布系统的服务器,新闻具有很多的类别:音乐新闻,体育新闻,音乐新闻,歌手新闻,歌曲新闻。现在不同的消息发布客户端就可以选择不同的类别去对应的模块上发布新闻,而订阅客户端则选择自己感兴趣的模块,然后服务器就会将发布客户端写的新闻存储到该新闻对应的队列中,然后将对应模块中的新闻推送给消息订阅客户端,但是如果一个新闻具有多个属性要怎么办呢?如果某一个新闻具有了多个属性,但是因为技术原因导致这个新闻只能推送到一个队列中,此时想要看到这个新闻的人就只能订阅更多的模块由此才能看到该新闻。这就很难受,由此就出现了交换机的概念。现在一个发布客户端发送的消息就会到达交换机(一个交换机会绑定几个队列),交换机内部会执行自己的匹配方法,然后发现这个新闻具有多个属性就会将这个新闻直接放入到多个队列中。图:

并且交换机也是存在不同类型的,假设某一个新闻具有全部的属性,那么这个新闻就需要广播到所有的队列中,此时就可以交给广播交换机。交换机的作用对消息的灵活转发起到一个促进作用。对于交换机的绑定需要注意,一个交换机没有必要将所有的队列都绑定了,毕竟队列肯定是很多的。交换机和队列之间的关系到底是一对一,还是一对多就是由这个绑定关系决定的。由此就能够知道一个消息队列服务器上是具有多个虚拟机的,而一个虚拟机内部是具有多个交换机的:

一个交换机内部的详细结构:

bingding就是绑定,Exchange就是交换机,所以一个客户端首先要决定的就是自己要访问的是哪一个虚拟机。上述的这些数据结构和数据,既要在内存中储存,也要在硬盘中储存(需要持久化)。总结一下:

核心API

对于消息队列服务器(Broker)来说需要通过下面的这些核心API,来实现消息队列的基本功能

创建交换机 (exchangeDeclare) 销毁交换机 (exchangeDelete) 创建队列 (queueDeclare) 销毁队列 (queueDelete) 创建绑定 (queueBind) 解除绑定 (queueUnbind) 发布消息 (basicPublish) 订阅消息 (basicConsume) 确认消息 (basicAck) 取消订阅 (basicCancel)

对于这些API后面会有更加细致的说明。

交换机类型

对于 RabbitMQ 来说, 主要⽀持四种交换机类型:

Direct

Fanout

Topic

Header

交换机决定了一个消息能够发布到哪一个队列中去,这里面有一个很关键的信息就是交换机的类型,因为Header交换机比较少见,就不说明了,主要说明前三种,这三种也是这个项目需要实现的交换机。

(直接交换)Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名 ,明确说明这个消息要给哪一个消息(比如在多人群中的专属红包)

(广播交换)Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中 (比如群发红包)

(主题交换)Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey。发送消息指定⼀个字符串为routingKey。当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列(类似于发⼀个画图红包, 发 10 块钱红包, 同时出个题, 得画的像的⼈,才能领。 也是每个领到的⼈都能领10块钱)

持久化

Exchange, Queue, Binding, Message 等数据都有持久化需求 当程序重启 / 主机重启, 保证上述内容不丢失

网络通信

因为这个项目并不是一个本机上的生产消费模型,所以无论是发布客户端要发送消息到服务器,还是服务器推送数据到客户端都会涉及到网络通信的功能需求。

⽣产者和消费者都是客⼾端程序, Broker 则是作为服务器,通过⽹络进⾏通信。

创建 Connection 关闭 Connection 创建 Channel(信道---连接的细化,为了能够充分的利用资源,一个连接上可以具有多个信道,每个信道都都自己的任务,互不影响,但是底层使用的其实是一个接口)关闭 Channel 创建队列 (queueDeclare) 销毁队列 (queueDelete) 创建交换机 (exchangeDeclare) 销毁交换机 (exchangeDelete) 创建绑定 (queueBind) 解除绑定 (queueUnbind) 发布消息 (basicPublish) 订阅消息 (basicConsume) 确认消息 (basicAck) 取消订阅(basicCancel)

Connection 对应⼀个 TCP 连接

Channel 则是 Connection 中的逻辑通道

⼀个 Connection 中可以包含多个 Channel。Channel 和 Channel 之间的数据是独⽴的,不会相互⼲扰。这样做主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接。

Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥具体的线缆.

消息应答

被消费的消息需要进行应答,主要有两种应答模式:

  1. 自动应答:消费者在消费完消息后,系统会自动认为该消息已被应答,Broker 会直接删除该消息。
  2. 手动应答:消费者需要手动调用应答接口,Broker 只有在收到应答请求后,才会真正删除该消息。

手动应答的目的是确保消息确实被消费者成功处理。这种模式在对数据可靠性要求较高的场景中比较常见。这个应答机制就出现在现在一个消息已经被推送到了客户端之后,我怎么确定客户端收到了这个信息呢?由此就有了消息应答(消息确认机制)。

有了上面的知识再来看一下这幅图:

就会有更加深入的理解了,各个部件之间的逻辑关系。

模块划分

下面是关于整个服务端模块的简单说明划分:

服务端模块:

  1. 数据管理模块
  1. 交换机数据管理模块
  2. 队列数据管理模块
  3. 绑定数据管理模块
  4. 消息数据管理模块(以上四个模块分别实现数据的管理:增、删、查,以及持久化的存储)

  1. 虚拟机数据管理模块虚拟机其实就是交换机 + 队列 + 绑定 + 消息的整体逻辑单元。因此,虚拟的数据管理其实就是将以上四个模块的合并管理。

  1. 交换路由模块
  1. 消息的发布:将一条新消息发布到交换机上,由交换机决定放入哪些队列。决定交给哪个队列,其中交换机类型起了很大作用(直接交换、广播交换、主题交换)。
  2. 交换机类型:
  • 直接交换:思想简单。 上面已经进行了说明
  • 广播交换:思想简单。 同上
  • 主题交换:涉及到一个规则匹配的流程。
  1. 交换路由模块:专门做匹配过程的模块。
  1. 消费者管理模块消费者指的是订阅了某个队列消息的客户端。一旦这个队列有了消息,就会推送给这个客户端。在核心API中,有一个订阅消息的服务。请注意,这里的订阅不是订阅某条特定的消息,而是订阅了某个队列的所有消息。当前,该模块主要实现了消息推送功能。因此,一旦有了消息,系统就需要能够找到与这条消息相关的消费者信息(即消费者对应的信道)。

  1. 信道(通信通道)管理模块一个连接可能会对应有多个通信通道。一旦某个客户端需要关闭通信,关闭的不是连接本身,而是该客户端对应的通信通道。关闭信道时,我们需要取消客户端的订阅。

  1. 连接管理模块该模块负责管理网络通信对应的连接。因为当一个连接需要关闭时,应该把与该连接关联的所有信道都关闭。因此,数据管理部分至少要管理这些关联的信道。

  1. 服务端BrokerServer模块这个模块是对以上所有模块的整合,将它们整合成一个完整的服务器。

然后是两个客户端的简单说明

客户端模块:

  1. 消费者管理模块
  1. 一个订阅客户端在订阅一个队列消息时,就相当于创建了一个消费者。

  1. 信道管理模块
  1. 客户端的信道与服务端的信道是一一对应的。
  2. 服务端信道提供的服务,客户端都有。
  3. 相当于服务端为客户端提供服务,而客户端则为用户提供服务。

  1. 连接管理模块
  1. 对于用户来说,所有的服务都是通过信道完成的。
  2. 信道在用户的角度就是一个通信通道(而不是连接)。
  3. 因此,所有的请求都是通过信道来完成的。
  4. 连接的管理就包含了客户端资源的整合。

  1. 基于以上的三个模块封装实现
  1. 订阅客户端:订阅一个队列的消息,并处理收到的推送消息。
  2. 发布客户端:向一个交换机发布消息。

理解了这个项目的框架,再去理解各个功能模块,为什么这些功能模块最后能够整合出整体的服务框架,下面再来一个一个的比较详细的介绍这些

服务端模块

持久化数据管理模块

首先来看一下交换机要管理的数据有哪些

交换机数据管理:
  1. 要管理的数据:描述了一个交换机应该具备哪些数据
  1. 交换机名称:唯一标识
  2. 交换机类型:决定了消息的转发方式
  • 在每个队列绑定中,有个binding_key;每条消息中,有个routing_key
  • 直接交换:若binding_key与routing_key相同,则将消息放入队列
  • 广播交换:将消息放入交换机绑定的所有队列中
  • 主题交换:若routing_key与多个绑定队列的binding_key存在匹配规则,且匹配成功,则放入队列
  1. 持久化标志:决定了当前交换机信息是否需要持久化存储
  2. 自动删除标志:当关联了当前交换机的所有客户端都退出时,是否要自动删除交换机 (这个标志在这个项目中并没有实现,但是是可以进行扩展的功能)
  3. 交换机的其他参数:当前未使用

然后就是对交换机本身进行的管理操作了

对交换机的管理操作:

  1. 创建交换机:
  1. 本质上需要的是声明,体现强断言的思想。
  2. 若有则保留,若无则创建。
  1. 删除交换机:
  1. 注意事项:每个交换机都会绑定一个或多个队列(意味着会有一个或多个绑定信息)。
  2. 因此,删除交换机时需要先删除相关的绑定信息。
  1. 获取指定名称交换机。
  2. 获取当前交换机数量。

以上就是对交换机要管理的数据,以及对交换机本身能够进行的操作了。

下面是队列要管理的数据,以及对队列本身能够进行的操作。

队列数据管理

一、要管理的数据

  1. 队列名称:唯一的标识
  2. 持久化存储标志:
  1. 决定了是否将队列信息持久化存储起来
  2. 决定了重启后,这个队列还是否存在
  1. 是否独占标志:
  1. 独占指的是,只有当前客户端自己能够订阅队列消息
  1. 自动删除标志:
  1. 当订阅了当前队列的所有客户端退出后,是否删除队列(暂未实现)
  1. 其他参数:(暂未使用)

二、提供的管理操作

  1. 创建队列
  2. 删除队列
  3. 获取指定队列信息
  4. 获取队列数量
  5. 获取所有队列名称:这个接口很重要原因如下
  1. 当系统重启后,需要重新加载数据
  2. 加载历史消息(消息以队列为单元存储在文件中)
  3. 加载消息需要知道队列名称,因为后续消息存储时,存储文件以队列名称命名

所以持久化存储标志的是否设定,也就是一个队列的消息是否需要持久化,最重要的指标就是这个队列的信息是否需要持久化,如果队列管理的数据进行了持久化,但是这个队列本身的信息没有持久化,那么重新加载后这个队列都没有了,还有必要进行重新加载吗?没有必要,因为此时根本没有人会去订阅这个队列,那么这个队列中的信息也就没有必要持久化了。

有了队列的信息,后面在实现功能的时候我才能知道这个队列和哪一个交换机绑定起来了。

下一个模块:

绑定数据管理模块:

绑定数据管理模块

描述:该模块用于管理哪个队列与哪个交换机之间的绑定关系。

管理的数据:

  1. 交换机名称
  2. 队列名称
  3. binding_key:绑定密钥,用于描述在交换机的主题交换或直接交换中的消息发布匹配规则。它由数字、字符、下划线(_)、井号(#)和逗号(,,注意此处原文中逗号可能是误打,通常应为点号或省略)组成。
  1. 示例:binding_key: news.music.#
  2. 消息中的routing_key示例:news.sport.football 此时在匹配的时候,就无法匹配上,但是如果这里的key位news.music.pop(流行音乐),那么就会成功完成匹配

管理的操作:

  1. 添加绑定
  2. 解除绑定
  3. 获取交换机相关的所有绑定信息:
  1. 在删除交换机时,需要删除与其相关的绑定信息。
  2. 当消息发布到交换机时,交换机通过这些绑定信息将消息发布到指定的队列。
  1. 获取队列相关的所有绑定信息:
  1. 在删除队列时,需要删除与其相关的绑定信息。
  1. 获取绑定信息数量

下一个是消息数据管理:

消息数据管理:

描述消息对其进行什么样的管理操作,以及这个消息里面的属性都是些什么(和上面的那些一样的)。

首先来看消息的属性是什么:

消息属性:

  • ID:消息的唯一标识
  • 持久化标志:表示是否对消息进行持久化(还取决于队列的持久化标志)
  • routing_key:决定了当前消息要发布的队列(消息发布到交换机后,根据绑定队列的binding_key决定是否发布到指定队列)
  • 消息主体:消息内容

附加信息(服务端为了管理所添加的信息):

  • 存储偏移量:消息以队列为单元存储在文件中,这个偏移量是当前消息相对于文件起始位置的偏移量。
  • 消息长度:从偏移量位置取出指定长度的消息(解决粘包问题)。
  • 是否有效标志:标识当前消息是否已经被删除(删除一条消息,并不会每次直接将后边的数据拷贝到前边,而只是重置了标志)。当一个文件中有效消息占据总消息比例不到50%,且数据量超过2000时,则进行垃圾回收,重新整理文件数据存储。当系统重启时,也只需要重新加载有效消息即可(相当于进行了一次垃圾回收)

然后是消息的管理信息(这些信息都是为了让队列能够更好的管理消息)

管理方式:以队列为单元进行管理(因为消息的所有操作都是以队列为单元的)。

管理数据详情:

  1. 消息链表:
  1. 功能:保存所有的待推送消息。
  1. 待确认消息Hash:
  1. 功能:消息推送给客户端后,会等待客户端进行消息确认。
  2. 流程:收到确认后,才会真正删除消息。
  1. 持久化消息Hash:
  1. 假设:消息都会进行持久化存储。
  2. 注意事项:
  • 操作过程中会存在垃圾回收操作。
  • 垃圾回收会改变消息的存储位置,但内存中的消息会存储消息的实际存储位置,因此垃圾回收后可能导致位置不一致。
  1. 更新机制:每次垃圾回收后,都需要用新的位置去更新持久化消息的信息。
  1. 垃圾回收:
  1. 步骤:
  • 将有效消息读取出来。
  • 重新截断文件,将消息连续写入文件中(确保文件中都是有效消息)。
  1. 持久化统计信息:
  1. 持久化的有效消息数量:记录当前持久化存储的有效消息总数。
  2. 持久化的总的消息数量:记录自系统启动以来持久化存储的总消息数,该数值决定了何时进行垃圾回收。
  3.   然后是对消息本身能够进行的管理操作:

消息管理操作:

  1. 向队列新增消息:
  1. 功能:将新消息添加到队列中。
  2. 实现:新消息会被加入到待推送消息链表中。
  1. 获取队首消息:
  1. 功能:从队列中获取最前面的消息。
  2. 实现:获取消息后,该消息会从待推送消息链表中删除,并加入到待确认消息中。
  3. 注意:此时消息不再是待发送状态,而是待确认状态。
  1. 对消息进行确认:
  1. 功能:确认消费者已经成功接收并处理了消息。
  2. 实现:从待确认消息中移除该消息,并进行持久化数据的删除操作。
  1. 恢复队列历史消息:
  1. 功能:在系统重启时恢复队列中的历史消息。
  2. 实现:主要在构造函数中进行,确保系统重启后能够恢复之前的消息状态。
  1. 垃圾回收(由消息持久化子模块完成):
  1. 触发条件:持久化文件中有效消息比例小于50%,且总消息数量超过200条。
  2. 实现:进行垃圾回收操作,清理无效消息,优化存储。
  1. 删除队列相关消息文件:
  1. 功能:当队列被删除时,删除其相关的消息文件。
  2. 实现:确保队列删除后,其消息文件也被相应删除,避免占用存储空间。
  3.   最后就是消息以队列为单位的管理操作:
  4.   队列消息管理
  5. 初始化队列消息结构:
  1. 在队列创建时,初始化其消息结构,准备存储消息。
  1. 移除队列消息结构:
  1. 在一个队列被删除时调用,清理并移除该队列的消息结构。
  1. 向队列新增消息:
  1. 功能:允许向队列中添加新的消息。
  2. 实现:新消息会被添加到队列的消息结构中,等待后续处理。
  1. 对队列消息进行确认:
  1. 功能:确认消费者已经成功接收并处理了队列中的消息。
  2. 实现:从队列的消息结构中移除已确认的消息,确保消息不会被重复处理。
  1. 恢复队列历史消息:
  1. 功能:在系统重启或恢复时,重新加载并恢复队列中的历史消息。
  2. 实现:从持久化存储中读取历史消息,并重新构建队列的消息结构。
  1.   以上是根据数据和数据之间的关系摸索出的几个关系,要不然管理起来很难,以上的消息管理简化成一个图如下:
  2.   队列中的就是消息。
  3.   下一个就是虚拟机数据管理了。经过之前的学习,我们已经知道了虚拟机就是队列,交换机,绑定和消息的集合体,所以要管理好虚拟机就要管理好这些数据。
  4.   要管理的数据:
  5. 交换机数据管理句柄
  6. 队列数据管理句柄
  7. 绑定信息数据管理句柄
  8. 消息数据管理句柄
  9.   要管理的操作:
  10. 声明/删除交换机:
  1. 在删除交换机时,需同时删除与其相关的绑定信息。
  1. 声明/删除队列:
  1. 在删除队列时,需同时删除与其相关的绑定信息及消息数据。
  1. 队列的绑定/解除绑定:
  1. 绑定时,需确保交换机和队列均存在。
  1. 获取指定队列的消息:
  1. 提供从指定队列中检索消息的功能。
  1. 对指定队列的指定消息进行确认:
  1. 允许对指定队列中的特定消息进行确认处理。
  1. 获取交换机相关的所有绑定信息:
  1. 当一条消息需要发布到指定交换机时,交换机通过获取所有绑定信息来确定消息应发布到哪个队列

下一个模块是路由匹配模块:

这个模块就是用来判断一个消息是否能够发布到指定的队列中的。所以这个模块理论上是没有需要管理的数据的。

路由匹配模块

概述:

路由匹配模块负责决定一条消息是否能够发布到指定的队列。它通过比较消息的发布规则(routing_key)与队列的发布匹配规则(binding_key)来实现这一功能。

关键概念:

  • binding_key:队列的发布匹配规则,用于确定消息是否能够发布到该队列。(每个队列和交换机的绑定信息中,都有一个binding_key,这是队列发布的匹配规则)
  • routing_key:消息的发布规则,与binding_key进行匹配以决定消息的去向(在每条即将发布的信息中,都有一个routing_key,这个是消息的发布规则)。
  • 交换机类型:
  • 广播:直接将消息发布给交换机的所有绑定队列。
  • 直接:routing_key与binding_key完全一致时,匹配成功。(一般来说如果遇到了这种发布规则,routing_key会被设置为队列名,而binding_key也是一样)
  • 主题:binding_key中包含匹配规则(如​​news.music.#​​​),routing_key符合这些规则时(如​​news.music.pop​​),匹配成功。

功能:

路由匹配模块本质上不直接管理数据,而是提供路由匹配操作接口:

  1. 判断routing_key与binding_key是否匹配:
  1. 提供一个接口,用于判断给定的routing_key与binding_key是否能够匹配成功。
  1. 判断routing_key是否符合规定:
  1. 格式约定:routing_key只能由数字、字母、逗号(,)、点(.)构成。
  2. 提供验证功能,确保routing_key符合规定的格式。
  1. 判断binding_key是否符合规定:
  1. 格式约定:binding_key只能由数字、字母、点(.)、井号(#)、星号(*)构成。
  2. 提供验证功能,确保binding_key符合规定的格式。这些特殊字符在主题交换机类型中具有特定的匹配含义。

下一个消费者管理模块

消费者管理模块

概述:

消费者管理模块涉及两种类型的客户端:发布消息和订阅消息。只有订阅了指定队列消息的客户端才被视为消费者。

消费者数据存在的意义:

当指定队列有消息时,需要将消息推送给这个消费者客户端(推送时需要找到与该客户端相关的信息,如连接)。

消费者信息:

  1. 消费者标识(tag):用于唯一标识消费者。
  2. 订阅队列名称:当当前队列有消息时,会推送给这个客户端;同时,当客户端收到消息后,需要对指定队列的消息进行确认。
  3. 自动确认标志:
  1. 自动确认:推送消息后,直接删除消息,无需额外确认。
  2. 手动确认:推送消息后,需要等待收到确认回复后再删除消息。
  1. 消费处理回调函数指针:当队列有一条消息时,通过这个函数进行处理(函数内部逻辑固定,即向指定客户端推送消息)。

消费者管理:

管理思想:

以队列为单元进行管理。每个消费者订阅的都是指定队列的消息,消费者对消息进行确认也是以队列为单位进行确认。最关键的是,当指定队列中有消息时,会获取订阅了这个队列的消费者信息进行消息推送。

队列消费者管理结构:

  • 数据信息:消费者链表,用于保存当前队列的所有消费者信息。采用RR(轮转)策略,每次取出下一个消费者进行消息推送(一条消息只需要被一个客户端处理即可)。

管理操作:

  1. 初始化队列消费者结构:创建并初始化队列消费者管理结构。
  2. 删除队列消费者结构:释放队列消费者管理结构所占用的资源。
  3. 向指定队列添加消费者:将新的消费者添加到指定队列的消费者链表中。
  4. 获取指定队列消费者:获取指定队列的消费者链表(或其中的某个消费者信息)。
  5. 删除指定队列消费者:从指定队列的消费者链表中移除指定的消费者。

此外,还包括一些其他的管理操作,如获取队列消费者数量、判断队列消费者链表是否为空等。

下一个就是信道的管理了,之前就说明过,信道其实就是一个连接的进行细分(为了更加充分的利用资源)而下面就是对信道管理的更加细节的说明:

信道管理: Channel

信道是网络通信中的一个概念,叫做通信通道。

  • 网络通信的时候,必然都是通过网络通信连接来完成的,为了能够更加充分的利用资源,因此对通信连接又进行了进一步的细化,细化出了通信通道。
  • 对于用户来说,一个通信通道,就是进行网络通信的载体,而一个真正的通信连接,可以创建出多个通信通道。
  • 每一个信道之间,在用户的眼中是相互独立的,而在本质的底层它们使用同一个通信连接进行网络通信。
  • 因此,因为信道是用户眼中的一个通信通道,所以所有的网络通信服务都是由信道提供的。

信道提供的服务操作:

  1. 声明/删除交换机
  2. 声明/删除队列
  3. 绑定/解绑队列与交换机
  4. 发布消息/订阅队列消息/取消队列订阅/队列消息确认

信道要管理的数据:

  • 信道ID(信道的ID,这样以便于区分不同的信道,哪一个信道来了一个新的消息,便于我们快速的找到这个信道,找到对应信道的信息以进行操作)
  • 信道关联的虚拟机句柄
  • 信道关联的消费者句柄:当信道关闭的时候,所有关联的消费者订阅都要取消,相当于删除所有的相关消费者。
  • 工作线程池句柄:信道进行了消息发布到指定队列操作之后;从指定队列获取一个消费者,对这条消息进行消费。也就是将这条消息推送给一个客户端的操作交给线程池执行。

说明:

  • 并非每个信道都有一个线程池,而是整个服务器有一个线程池,大家所有的信道都是通过同一个线程池进行异步操作而已。

对信道的管理:

1.创建信道

2.关闭一个信道

3.获取指定信道的信息(对信道的操作其实就是增删查)

连接的管理

到这一块的时候细节方面已经差不多了,后面更多的是在进行整合。

概念: 网络通信连接

在网络通信模块中,我们使用muduo库来实现底层通信,muduo库中本身就有Connection连接的概念和对象类。

但是我们的连接中,还有一个上层通信信道的概念,这个概念在muduo库中是没有的。

因此,我们需要在用户的层面,对这个muduo库中的Connection连接进行二次封装。形成我们自己所需的连接管理。

管理数据:

  1. muduo库的通信连接
  2. 当前连接关联的信道管理句柄

连接提供的操作:

  1. 创建信道
  2. 关闭信道

管理的操作:

  1. 新增连接
  2. 关闭连接
  3. 获取指定连接信息
Broker服务器模块

这个模块其实就是对上面所有功能模块的整合,将所有的功能模块整合到一起,来形成一个消息队列服务器,所以在这个模块中更多提供的是管理的信息,而不是管理的操作

整合以上所有模块,并搭建网络通信服务器,实现与客户端网络通信,能够识别客户端请求,并提供客户端请求的处理服务。

管理信息:

  • a. 虚拟机管理模块句柄(虚拟机管理模块总的句柄)
  • b. 消费者管理模块句柄(每一个信道中都有一个自己关联的消费者,这里的是一个总的消费者的管理)
  • c. 连接管理模块句柄(将所有的连接管理起来)
  • d. 工作线程池句柄(整个服务器有一个工作线程池,而不是每一个信道有一个,只不过是一个信道有了消费者之后,会将这个线程池给这个信道让其能够操作线程池而已)
  • e. muduo库通信所需元素(这些元素其实就是muduo库网络通信的时候有多少个connection,必须要有一个协议处理的操作)

再对上面的信息进行一个总结:

Broker服务器模块:

这个模块是一个功能整合模块,本质上这个模块并不提供实质的功能性操作。

这个模块最重要的是资源的整合,是一个资源的载体

  1. 工作线程池:一个服务器有一个工作线程池,其他所有的信道操作都是这同一个线程池。
  2. 虚拟机:一个服务器有一个虚拟机,其他所有交换机,队列,绑定,消息的操作都是针对这个虚拟机进行的。
  3. 消费者管理:一个服务器有一个消费者管理。
  4. 通信相关连接管理:协议处理模块句柄,也是一整个服务器有一套。

客户端模块

客户端拥有的模块和服务端有些是差不多的,因为某些服务服务端提供给了客户端,而客户端也要提供给用户。当然也是存在不同的(比如连接管理模块)。

1.消费者管理模块:

  1. 消费者管理模块
  • 消费者标识
  • 订阅的队列名称
  • 自动确认标志
  • 消息回调处理函数指针

当当前消费者订阅了某一个队列的消息,这个队列有了消息后,就会将消息推送给这个客户端,这时候收到了消息则使用回调函数进行处理,处理完毕后根据确认标志决定是否进行消息确认。

管理操作:

  • 增删查

2.信道管理模块

所有提供的操作与服务端雷同,因为客户端给用户要提供什么服务,服务器就要给客户端提供什么服务管理信息:

  1. 信道ID
  2. 消费者管理句柄:每个信道都有自己相关的消费者
  3. 线程池句柄:对推送过来的消息进行回调处理,处理过程通过工作线程来进行
  4. 信道关联的连接

信道提供的服务:

  1. 声明/删除交换机
  2. 声明/删除队列
  3. 绑定/解绑队列与交换机
  4. 发布消息/确认消息
  5. 订阅队列消息/取消订阅队列消息
  6. 创建/关闭信道

信道的管理: 信道的增删查

  1. 连接管理模块

客户端连接的管理,本质上是对客户端TcpClient的二次封装和管理。

面对用户: 不需要有客户端的概念,连接对于用户来说就是客户端,通过连接创建信道,通过信道完成自己所需服务。因此,当前客户端这边的连接,对于用户来说就是一个资源的载体。

管理操作:

  1. 连接服务器
  2. 创建信道
  3. 关闭信道
  4. 关闭连接

管理的资源:

  • 工作线程池
  • 连接关联的信道管理句柄
  1. 异步工作池模块
  2. TcpClient模块需要一个EventLoopThread模块进行IO事件监控
  3. 收到推送消息后,需要对推送过来的消息进行处理,因此需要一个线程池来帮助我们完成消息处理的过程

将异步工作线程模块单独拎出来的原因:

  • 多个连接用一个EventLoopThread进行IO事件监控就够了
  • 所有的推送消息处理也只需要有一个线程池就够了

并不需要:

  • 每个连接都有一个EventLoop
  • 每个信道的消息处理都有自己的线程池

有了这几个模块之后,首先要做的就是搭建一个异步工作池,然后创建连接(将异步工作池传递进去),然后通过连接建立信道,之后用户的所有操作通过信道完成。

到这里这个项目的所有模块就介绍完毕了,下面再使用图将这些模块整合起来即可

项目模块关系图

需要说明的是当一个消息来到的时候,确定给了某一个虚拟机上的交换机之后,这个交换机会将这个消息进行匹配对照知道这个消息要放到哪一个队列中,然后将这个信息放到队列中。在这个服务中,消息是以队列为单位进行保存的,并且是在内存和硬盘和数据库中都存有一份,当内存中某一个队列上的信息符合了垃圾回收进行了这个操作之后,会将垃圾回收后的这个队列新的位置持久化到硬盘和数据库中。然后就是整合这些模块的BrokerServer模块了,并且也是资源的具象化。这个模块会向客户端提供各种功能。而当客户端和服务端建立了信道之后(信道来自于连接)。客户端就会向上为用户提供不同的操作,当用户选择了某一个操作之后,客户端会通过信道向服务端进行通信,然后服务端就会执行对应的操作。而针对于不同的客户端,向用户提供的操作也是不同的,对于订阅客户端来说,如果自己订阅了某个队列之后,客户端会通过订阅客户端接口通过信道向服务端进行通信,发送的正是一个messege,服务端再去将这个信息找到虚拟机,在通过路由匹配选择合适的队列。

下面正式开始项目的实现。

项目的文件结构

demo文件夹中放的就是一些需要经过测试的代码文件,再测试完成之后再引入到主项目中,mqclient就是客户端模块了,mqcommon存放的是多个模块需要共用的模块比如线程池,protobuf,日志打印等等,mqtest下进行项目的单元测试,third就是第三方库的存放目录。

实用工具的完成

这些工具完成了之后,便于我们之后项目的书写,不需要之后再项目中使用的时候再回来写。第一个日志打印工具,便于我们通过打印出来的信息判断是否出错进行调试。

然后就是使用Helper工具的完成,这个工具下具有几个不同的类,第一个文件基础的操作类(删除,创建等等),sqlite基础操作类,字符串操作类,UUID生成器类。

日志工具的完成

这里我将代码的实现放到mqdemo下生成一个log文件夹下创建log.cpp,封装一个日志宏,通过日志宏进行日志的打印,再打印的信息前带有系统事件已经文件名和行号

比如 [14:54:47] [log.cpp:12] 打开文件失败。

为了得到文件名字和行号需要知道下面的两个宏:

_FILE__和__LINE__到时候编译器会将这两个宏替换成文件的名字和行号。

为了得到具体的事件需要使用到下面的函数:

第二个参数就是需要转化成的格式是什么样的,可以使用的格式如下:

这个函数的作用就是将时间按照一定的格式转化为一个字符串,那么上图中标记的这个结构体如何来呢?需要使用下面的这个函数:

这个函数会根据现在的系统时间返回一个struct tm结构体指针。

上图就是这个结构图中的内容。使用上面的结构体就能够完成时间的格式化了。将上面的接口使用起来写一个测试代码:

成功的打印出了我需要的信息。

但是不可能每一次我都使用这样的操作去进行日志打印,这样就会造成项目代码冗余过高,下面就来将这个代码封装为一个宏,并且将日志等级也添加上(某些日志等级并不需要进行信息的打印,而有的需要进行信息的打印)。

上面的这个代码就完成了对一个宏函数,并且这个宏函数会打印日志的等级。在等级大于ERR的时候才会进行打印。##__VA_ARGS__这个宏需要带上##是为了防止外部在进行参数传递的时候,只传入了一个字符串("hello world"),并没有后面的%d,导致LOG(lev_str,level, format, ...)中format被匹配之后,后面的,没有被匹配导致的错误。

运行一下:

运行成功然后将这个代码拷贝到项目的公共模块下

为了防止这个模块被重复包含再去定义一个条件编译

当然上面的日志打印只是一个简单的权宜之计,因为实现一个真正的日志系统很复杂,并且是存在第三方库可以使用的,但是如果再增加的话,就让项目变得更加复杂了,为了简单化项目,所以就使用了这个日志系统。

sqit工具类的完成

这个工具类其实在之前教学使用sqit3数据库的时候已经实现了,直接将之前使用的代码拿过来,然后将日志添加到之前写的代码中即可。

这样就完成了。

字符串分割类的完成

这里需要知道的是字符串分割的完成思想就是遍历字符串,找到分割符,然后获取答案。

这样就完成了这个代码,然后再将这个代码封装为一个类函数,然后放到工具文件中

uuid生成工具类的完成

因为发送的每一条信息都需要有一个独一无二的id,所以此时就需要使用一个uuid的生成器来保证每一个消息都具有自己的uuid了。在这⾥,uuid⽣成,我们采⽤⽣成8个随机数字,加上8字节序号,共16字节数组⽣成32位16进制字符 的组合形式来确保全局唯⼀的同时能够根据序号来分辨数据(加上8字节序号是为了更加容易分辨,因为肉眼分辨随机数很难)。

这里我们先来解决随机数的问题:

c++中具有一些类能够生成随机数。

这个类生成的随机数是一个机器随机数,机器随机数就是通过硬件生成的随机数,生成这个随机数的效率比较低,因为速度比较慢,这里我们需要生成8个随机数,如果都使用这个方法,那么对于每一个消息都需要一个随机数的情况来说,会拖慢运行速度,所以这里并不能完全使用这个机器随机数。而在官方库中也提供了一个生成随机数的函数,但是生成的随机数是一个伪随机

这个函数就是使用了梅森旋转算法(Mersenne Twister)的函数,19937是数据的范围说明生成的随机数范围在2^19937-1这个范围内,范围越大越不容易重复。这个函数和之前使用过的random函数一样需要使用一个随机数种子,否则生成的随机数就可能出现重复的,这里我们就使用上面的机器随机数,作为这个函数的种子来生成随机数。

解决方法:通过机器随机数作为种子,来生成一个伪随机数。

这样就通过梅森旋转算法生成了伪随机数,相比于生成机器随机数,这个函数的效率就高很多了。但是现在还有一个问题,那就是我只需要8个0到255之间的随机数啊,进行mod操作吗,不需要因为标准库中也给我们提供了一个在数据区间取数据的函数

看这个例子

首先通过限定范围构造了一个对象出来,然后再根据生成的随机数进行区间取值

之后生成的随机数都是在这个范围内的,这样就能够解决获得8个随机数(0到255范围)。下面就是要将这个生成的8个随机数变成16进制的数字字符了

但是也可能出现下面这样的字符:

因为生成的这个随机数大小只有这么大,所以之后再生成的时候0就被忽略了。此时就需要设置位宽,和填充的数字了。

这样即使随机数生成的这个数字只是一位的也会具有两个位宽。

这里因为我们需要的是8个随机数,所以需要进行8次循环。stringstream这个类会将生成的数据存放在自己的缓冲区中,所以在前面直接不断的进行放置即可。

这样就得到了8个随机数生成的16位的字符串,这样生成的数据的唯一性已经很强了,但是我想要修改一下格式(按照844的格式),也就是让这个字符串生成的第8,12,16个字符的后面增加-。所以也就是在生成第4,6,8个字符串的时候往后追加即可。

这样生成的随机数更加便于观察。然后还需要一个编号,这里使用一个原子的静态变量即可,静态变量保证了编号是随程序在进行累加的,而原子保证了每一个信息拥有的编号是唯一的。

这样就生成了一个我需要的uuid。这里的思想就是生成8个1字节随机数,再取8字节的序号,组成16字节的数据,然后转化为16进制字符,共32个再使用这个字符组成一个特定格式的id就是这个方法的思想。

然后需要将这个方法放到工具类中。然后进行测试:

可以看到右边的序号是不断的递增的过程

文件基础操作类

这个类中需要完成的功能为以下几个:

1.判断文件是否存在 2.文件大小获取 3.文件读写 4.文件创建/删除 5.目录创建/删除

这个类我就不在demo中做了,而是直接写在helper文件中了。首先完成这个类的大体框架搭建。

上图代码中read函数body类型(从文件特定的位置进行读取的那个read接口)错了,应该是char*类型而不是string类型,因为在写read函数的时候发现需要使用的接口,需要使用char*,同时write的那个函数的string类型也需要修改为const char*(写入文件的特定位置的那个write函数)

然后就是去一个一个完成这些函数了。

判断文件是否存在可以使用下面这个函数:

这个函数第一个参数就是你要判断的文件的文件路径,第二个是mode,在下面的描述中如果mode被设置为了F_OK并且你要判断的这个文件存在那么这个函数的返回值就是0。但是这个函数的头文件是unistd.h也就意味着这个函数是一个系统调用,并且是在Linux下使用的,也就不具备跨平台性。所以不使用这个函数,转而使用下面这个函数stat

这个函数的头文件是sys/stat.h上图中的unsitd.h并不是这个函数的返回值,这个函数会获取一个文件结构体这个结构体中的信息:

文件的inode节点号,文件的设备号,文件的大小,访问时间等等,都在结构体中。大多数情况下都能够获取到一个文件的属性,如果没有获取到这个文件的属性,说明这个文件不存在,,获取文件属性成功返回0否则不返回0。并且这个接口具有一定的跨平台性,所以就是用这个函数来完成判断文件是否存在的函数。由此就能够完成exists函数

如果获取文件的大小呢?之前不是说了吗?在stat函数的这个结构体中就具有这个文件大小的属性,可以通过这一点来获得文件的大小。

对于读文件,首先来完成读一个文件的特定部分,要完成这一步需要按照下面的步骤来进行:

第一步:打开文件,第二步:跳转到文件读写位置,第三步:读取文件数据,第四步关闭文件。

打开文件使用文件流即可完成

std::ifstream

使用这个文件流需要包含头文件<fstream>同时这个文件流还具有以下的选项:

  1. std::ios::in​:
  1. 以读取模式打开文件(默认模式)。如果文件不存在,将会打开失败。
  1. std::ios::out​:
  1. 以写入模式打开文件。如果文件已经存在,则会清空文件内容。如果文件不存在,则会创建新文件。
  1. std::ios::app​:
  1. 以追加模式打开文件。写入操作将会添加到文件末尾,而不会清空文件内容。
  1. std::ios::trunc​:
  1. 如果文件已经存在,打开文件时会清空文件内容。通常与 ​​std::ios::out​
  1. std::ios::binary​:
  1. 以二进制模式打开文件。默认情况下,文件以文本模式打开。使用二进制模式可以处理非文本文件(如图像、音频等)。
  1. std::ios::ate​:
  1. 打开文件后,将文件指针移动到文件末尾。这允许你从文件末尾开始读取。
  1. std::ios::nocreate​:
  1. 仅在文件存在时打开,不创建新文件。
  1. std::ios::no_trunc​:
  1. 如果文件存在,则不清空文件内容。

然后就来完成这个接口:

这个接口中并没有太多的校验,需要用户在使用的时候注意(比如判断len值等等),所以需要用户自己进行注意,否则会崩溃。

下一个read接口,复用这一个read接口即可:

这个代码中的&body[0]就是将body中的首位地址取出来。这里如果不调整空间大小那么body中的空间就是0,然后后面的函数直接使用body中的空间就会出现访问未定义空间的问题

下一个write函数:

第一步:打开文件,第二步:跳转到文件指定位置 第三步:写入文件 第四步:关闭文件,这里有一个小知识点,要进行跳转,必须要具有文件的读权限(所以下面的代码中会将读权限也构建好)。

并且因为是写入文件,并且要进行跳转所以使用的类为ostream,整体的思路和read函数是一样的:

另外一个write函数直接复用这个write函数即可。

但是在下面这种情况下是会存在一个大问题的:如果一个文件中是存在数据的(100字节)。那么如果调用上面的函数将50字节的数据从这个文件中进行写入,那么这个文件的大小任然是100,只不过前50个字节被覆盖成为了我写入的文件,但是后50字节的数据还是存在的,此时我们再处理数据的时候就会出现问题了。但是我这里就不解决这个问题了,如果遇到这种情况了,将这个文件删除然后重新再去写入这个文件即可。

这里我再给这个类增加一个接口(修改文件的名字)

这个接口后面会有用处。

现在去完成下一个接口,获取一个文件的父级路径,什么是父级路径呢?假设现在一个文件的路径为/ww/dd/ff/des.txt这个des文件的父级路径就是/ww/dd/ff,要想获得这个路径只需要从这个路径字符串从后往前走找到最后一个/然后从开头开始截取字符串即可。而在string中恰好有一个就是从后往前找一个字符的接口。

但是如果外部传入的就是这个文件名字呢?此时我们去寻找/是找不到的,此时直接返回./,当前目录不正是这个文件的父级目录吗?

下一个函数,修改文件名字的函数,这个函数要如何完成呢?难道是获取储存有这个文件的结构体,然后修改结构体中的内容吗?或者是删除原始文件,然后创建一个新的文件?其实都不用存在一个接口能够完成修改文件名字的操作

专门用来修改文件名字的函数,成功返回0,失败返回-1。所以修改文件名字的函数直接调用这个函数就可以了。

下一个创建文件的接口,这个接口的实现思路也很简单,打开一个文件,如果这个文件不存在就创建,存在什么都不做直接关闭文件。

下一个删除文件接口,这个接口在标准库中依旧是提供了接口的。

这个函数内部封装的系统调用是unlink,这里为了移植性没有使用系统调用。

下一个接口创建目录,如果存在一个正常的路径让我去创建目录,那么也是存在一个函数可以让我们使用的:

但是如果在创建目录的时候,某一个目录的父级目录不存在呢?所以在创建多级目录的时候需要确保父级目录一定存在,也就是创建多级目录的时候,需要从第一个父级目录开始创建。所以我们从字符串中一个一个的去截取。最主要的是我们需要递归的去创建目录,所以每一次都要从path的0位置开始往后截取。

最后一个函数要移除目录,这里依旧存在一个函数:

但是这个函数的要求就是这个目录必须是空的,所以要删除这个目录首先就要删除这个目录中的文件,然后再去删除这个目录。虽然代码不难写,但是代码很多。但是不要忘了在Linux中有一个命令rm -rf能够完成这个功能,而在Linux中命令也就是一个文件,可以调用下面这个接口来执行这个文件,来执行这个命令。

这个函数就能够执行一个指令,还有一个方法就是进行程序替换,也是可行的,但是这里我就不使用这种方法了,因为执行程序替换要创建线程。代码如下:

这样这个文件基础操作类也就完成了。

下面就是测试这个文件基础操作类的功能了。

首先来测试文件是否存在,以及文件大小的测试,这里就测试之前写的logger.hpp文件

执行结果:

执行成功,文件大小也是正确的:

证明这两个功能是成功的,下一个功能文件的读写和文件目录的创建:

这里我首先在当前目录下创建一个aaa/bbb/ccc/tmp.txt文件,为了创建这个文件首先就要判断这个文件是否存在如果这个文件不存在,先去创建父级目录,最后再去创建文件。

代码:

执行后的结果:

成功创建了文件,以及目录。

下面再去测试文件的读取和写入功能。

运行结果:

最后再来看tmp中是否存在代码:

可以看到tmp中也是成功写入了代码。读写功能也就测试完毕了。

下面我要去读取tmp中的局部内容然后进行修改,这里我就读取__M_LOG_H这个宏,这个宏是从第8个字节开始读取9个字符。

运行结果:

局部读取成功。写一个局部的写入:

运行结果:

tmp中的内容:

现在局部的写入和读取也测试成功了。

还有几个功能:修改文件名字,删除功能的测试。

名字修改:

删除文件:

删除路径:

到这里所有的功能就测试完成了。

消息类型proto编写

因为这个项目涉及到了网络通信,也就会涉及到序列化和反序列化,所以这里需要先将消息类型proto文件写好,之后生成相关代码。既然要将消息类型的proto文件写好,就需要定义消息的类型。

消息本身要素:

i. 消息属性:

{

消息ID:唯一标识每条消息的标识符。

消息投递模式:选择消息的投递方式,可以是:

  非持久化模式

  持久化模式

消息的routing_key:用于消息队列中路由消息的关键字。

ii. 消息有效载荷内容:

}消息内部本身所包含的四个内容以上就是。

  • 这部分包含消息的具体数据,即消息需要传递的信息内容。

下面的额外要素是因为某些信息需要进行持久化,所以需要的元素

消息额外存储所需要素:

i. 消息的存储位置

  • 指定消息存储的具体路径或位置。

ii. 消息的长度

  • 消息内容的字节长度。

iii. 消息是否有效

  • 使用字符的​​0​​​或​​1​​​来表示消息的有效性,而不是使用布尔类型(​​bool​​),因为在持久化时,布尔类型所占的长度不同,可能会导致修改文件中消息有效位后消息长度发生变化

再加上客户端和服务端也会使用到一些交换机的信息,所以需要将交换机的信息也一起也到proto文件中。

1. 交换机类型

a. DIRECT

b. FANOUT

c. TOPIC

2. 消息投递模式

a. UNDURABLE:在RabbitMQ中,此模式的值为1,咱们也效仿

b. DURABLE :值为2.

如上就完成了消息的proto文件的编写,然后就是进行生成了。

成功生成了我们需要的文件。

项目功能服务端模块编写

交换机数据管理

要完成这个管理工作首先就要定义出一个交换机的数据类,交换机中绑定了多个队列,当发布客户端发送了消息之后,会交给交换机,然后由交换机来决定这个消息要进入哪一个队列中。

定义交换机数据类

a. 交换机名称 (唯一标识)

b. 交换机类型 (决定分发方法,广播:交给所有队列,直接交换:交给指定队列,主题交换:发送给符合匹配条件的队列)

c. 是否持久化标志

d. 是否⾃动删除标志 (当前项目最后没有实现)

e.其它参数

以上就是所有交换机都具有的数据。

又因为交换机中的是有可能需要进行持久化的,所以这里还需要定义一个交换机数据持久化类。

定义交换机数据持久化类(数据持久化的sqlite3数据库中)

a. 创建/删除交换机数据表

b. 新增交换机数据

c. 移除交换机数据

d. 查询所有交换机数据

e. 查询指定交换机数据(根据名称)

这个类提供了交换机的增删查功能,以及创建表的功能。

只有先定义好了交换机的数据类,然后才能进行交换机的管理类。

定义交换机数据管理类

a. 声明交换机,并添加管理(存在则OK,不存在则创建)

b. 删除交换机

c. 获取指定交换机

d. 销毁所有交换机数据

以上就是我们需要完成的功能。

下面我们就根据上面的功能将代码的框架搭建好,然后再去完成各个函数。

这里我后面又添加了一个无参的构造,因为后面需要使用到这个无参的构造。

上图中的setArgs和getArgs函数可以看作是数据库和map之间的序列和反序列函数。

下一个类框架交换机数据持久化管理类:数据保存在sqlite数据库中。

下一个交换机数据管理类:

这个数据管理类之后就会提供接口给外层,外层使用这些接口完成对交换机的管理。以上只是将框架完成了,为了完成这些函数,还需要写一些私有函数来帮助我们完成这些函数。

那么下面我们就来实现这些函数:第一个Exchange类中的setArgs函数,这个函数可以认为是Exchange和数据库之间的反序列函数,用于将从数据库中读取的数据,转化为可以放到map中的数据,另外一个函数就是getargs函数,这个函数和setArgs函数相反,是将交换机中的数据变成合适的字符串之后再放入到数据库中。

setArgs函数:

getArgs函数:

至于最后一个key=value的字符串需不需要带&则带也可以,不带也可以。没有影响,因为字符串分割的时候,最后一个&后面的\0是不会被截取进来的。

下一个交换机持久化数据管理类的构造函数,既然要进行持久化,那么要做的事情就是打开数据库,这里因为使用的是sqlite数据库所以首先要做的是确保这个数据库文件已经存在了,所以需要打开这个文件成功(没有会进行创建)之后再去进行数据库的操作。

这样这个类的构造函数就完成了。其中使用了几个后面使用到的函数。

下一个函数:创建/删除数据库表以及向表中插入/删除数据:这些函数的实现逻辑很简单就是写好一个sql语句,然后调用进行执行就可以了。

现在消息队列重启了,需要重新将硬盘中的数据加载到内存中,由此就引入了最后一个函数:

getall()函数这个函数会返回一个哈希表这个表中的数据就是需要加载到内存中的数据。

这个函数的思路也很简单,从表中进行数据的查询,然后将查询的数据放到一个map中,这里就需要一个私用函数了,这个函数用于将从数据库中查询到的信息拿取上来(从数据中储存的信息被保存再一个结构体的缓冲区中)现在需要使用这个函数将缓冲区中的数据拿上来,放到map中这个私有函数做的事情就是这个。

这样这个recover函数就完成了。

到这里交换机磁盘管理类就完成了。

最后就是交换机的数据管理类了,这个类因为上面两个类的完成就很简单了:

构造函数:

声明交换机:

删除一个交换机:

判断交换机是否存在

删除所有交换机:

选择了直接删除表和内存中的数据

获取某个交换机的信息:

到这里整个交换机的数据管理模块就完成了,要做的就只有进行测试了。

使用gtest进行测试。

其中第一个插入测试会插入几个交换机,其中map中的内容为k1=v1&k2=v2&

然后是运行结果:

成功执行了,如果出现了段错误,那么使用gdb进行调试。

下一个删除测试

在进行这一次测试之前,我将gtest测试组件中的析构函数中的会删除表和数据的函数进行了注释,让我在程序运行之后去数据库中检查exchange3是否被删除。

可以看到成功执行了,并且获得的也是空指针。去数据库中进行查询也是成功完成了删除。

还有一个exists检测一个交换机的功能需要进行测试:

运行结果:

所有结果都成功运行了。还剩下最后一个重要的功能也就是recover功能。

刚好现在我的数据库中还存在数据没有清理,这里我先将insert测试取消。然后再恢复了之后去查询exchange2这个交换机是否存在。

因为这一次不会进行插入,所以会直接从数据库的表中进行数据的恢复(ExchangeManper的构造函数中进行的,不需要我进行调用)。然后进行测试:

成功运行了,唯一一个没有通过的用例是因为哈希表中的值是无序的,导致出现了那样的字符串,但是还是符合=前和后分别是key和val。第一个=也不会影响后续的程序,因为对表中的数据没有造成影响:

到这里这个类的测试也就完成了,并且在大的逻辑上也没有问题。最后在进行一个测试那就是如果map中的值为空是否会出现问题呢?

进行下面的测试然后直接去查询表:

成功了,没有出现问题。到这里这个类就大体完成了。

队列数据管理

这个队列数据管理主要是服务端需要一个描述当前客户端创建了哪些队列的类。这样当客户端想要将信息发送到哪一个队列的时候,才能通过这个类中的标识知道当前信息能够推送到哪些队列中,所以队列数据管理本质上队列描述信息的管理,用于描述当前服务器上都存在哪些队列。

那么这个队列描述信息类中究竟存在哪些信息呢?

  1. 定义队列描述数据类

a. 队列名称

b. 是否持久化标志(队列中的信息是否需要放入到数据库中)

c.是否独占标志(没有实现,但是这个标志的作用是只有创建了这个队列的消费者才能去消费这个队列中的数据,其它人不允许消费)

d.是否自动删除标志(没有实现,如果对当前队列操作的所有消费者都已经退出了,那么这个队列是否需要自动进行删除)

e.其它参数

对当前项目来说真正用到的只有a和b其它的标志都是从RabitMQ源码中得来的,只不过

RabitMQ原本项目过于庞大这里没有继续实现了。既然有了描述信息类并且这个类也需要进行持久化所以和交换机一样:

  1. 定义队列数据持久化类(数据持久化的sqlite3数据库中)
  1.   a. 创建/删除队列数据表

b. 新增队列数据

c. 移除队列数据

  d. 查询所有队列数据(通过这个接口将磁盘中的信息读取出来,恢复到内存中)

  1. 定义队列数据管理类 (提供接口给外层使用)
  1.   a. 创建队列,并添加管理(存在则OK,不存在则创建)
  2.   b. 删除队列
  3.   c. 获取指定队列
  4.   d. 获取所有队列
  5.   e. 判断指定队列是否存在 (用于功能测试的接口)
  6.   f. 获取队列数量(用于功能测试的接口)
  7.   g. 销毁所有队列数据(用于功能测试的接口)

下面就是队列信息描述类的书写:

还有一个无参的构造上图忘了添加了,但是后面我添加了。

这个类需要从数据库中进行数据的读取,所以这个类和交换机数据类一样,需要两个格式函数,并且格式化方式是一样的,这里我直接将交换机数据类中的那两个类拿出来用了。

到这里第一个数据类就完成了。

持久化数据管理类的完成:

这个类需要对数据库进行操作,所以依旧会具有一个数据库的操作句柄,并且构造函数也需要传入数据库文件的路径。

首先来完善构造函数:

然后就是创建表函数了,逻辑就是写sql语句然后使用句柄去进行执行

然后是删除表的函数

依旧是写sql语句然后执行:

然后是插入数据的函数:

然后是删除某一个队列的函数:

最后一个函数是将数据库中的信息读取出来然后放入到内存中:

首先执行select语句,然后通过回调函数将读取到的信息放入到一个map中,最后返回map对象。

这样这个类也就完成了,这个代码和之前的交换机磁盘数据管理类可以说是一样的。到这里队列磁盘数据管理就完成了。

对外提供服务的队列数据管理类:

然后就是完成各种管理函数了:

构造函数:

声明一个queue的函数:

删除一个queue的函数:

返回一个指定队列的信息:

返回所有队列的信息就是将内存中的哈希表进行返回:

判断一个队列是否存在:

然后是放回当前队列的数量,已经删除所有队列信息的函数:

到这里整个队列数据管理类就完成了,下面就是进行测试了。

首先依旧是进行插入和查询的测试:

测试运行结果:

全部通过这也就证明了,查询和插入功能没有问题,下一个删除,判断是否存在的测试:

运行结果:

这里我没有清理表中的数据,此时去看数据库中的数据:

也是符合的。

然后就是测试恢复功能了。

这里直接去看能否读取到queue2,然后删除queue2,再去进行存在判断。

从运行结果可以看到,其它的测试都成功执行了,除了将哈希表中的数据变成字符串导致的无序让我这里测试没有通过以外。但是这不重要,上面的测试已经说明了恢复功能是没有问题的。

从没有删除的数据库表中查询是否还存在queueu2

果然也是不存在了.到这里测试也就完成通过了。

绑定信息数据管理

首先绑定信息描述了交换机和哪些队列之间具有关联关系的一个描述,在这个描述中会给双方建立一个映射关系。

当发布客户端发送消息到交换机之后,交换机通过这个绑定数据就能够知道这个交换机和哪些队列之间具有关联关系,然后根据匹配规则将消息发送到对应的队列中。

下面就是对于这个绑定信息类的定义了:

定义绑定信息类

a. 交换机名称

b. 队列名称

c. binding_key(分发匹配规则-决定了哪些数据能被交换机放⼊队列)

因为这个服务和交换机和队列有关,所以这个队列中的信息也要在数据库中进行持久化,所以这个类也是具有数据库持久化管理类的:

定义绑定信息数据持久化类(数据持久化的sqlite3数据库中,句柄依旧是数据库的句柄)

a. 创建/删除绑定信息数据表

b. 新增绑定信息数据

c. 移除指定绑定信息数据

d. 移除指定交换机相关绑定信息数据:移除交换机的时候会被调用(交换机都没有了那么绑定信息自然也需要进行删除)

e. 移除指定队列相关绑定信息数据:移除队列的时候会被调用(同上只不过是队列没有了)

f. 查询所有绑定信息数据:用于重启服务器时的数据恢复。

然后就是最后的管理内存和硬盘并对外提供服务的绑定信息数据管理类了:

定义绑定信息数据管理类

a. 创建绑定信息,并添加管理(存在则OK,不存在则创建)

b. 解除指定的绑定信息

c. 删除指定队列的所有绑定信息

d. 删除交换机相关的所有绑定信息

e. 获取交换机相关的所有绑定信息:交换机收到消息后,需要分发给⾃⼰关联的队列

f. 判断指定绑定信息是否存在 (用于测试)

g. 获取当前绑定信息数量 (用于测试)

h. 销毁所有绑定信息数据 (用于测试)

这里需要知道的真正的RabitMQ是具有图形化界面的,这个界面会将具有多少虚拟机队列,以及绑定信息都显示出来,所以需要底层提供一个获取全部信息的接口。

下面首先是对这个绑定信息的描述类:

这个类因为描述的东西不多所以写这个类也很简单。

然后就是绑定数据持久化管理类了。

上面是这个类最基本的东西,然后我们需要知道的是队列的绑定信息以及交换机的绑定信息要怎么去定义。

因为一个队列只能绑定一个交换机,所以对于一个队列来说一个交换机只会有一个绑定信息。所以队列和绑定类之间的绑定关系就是

但是一个交换机是可以绑定多个队列的

使用上面两种方法来表示是因为我们使用的最多的功能就是获取一个交换机相关的绑定队列,因为当交换机收到一个消息之后这个消息需要放到哪一个队列中是通过这些绑定信息来决定的。

所以当重启服务的函数的返回值返回的应该是ExchangeBindMap通过这个表就能够找到交换机和哪些队列存在关系,然后通过队列中的绑定信息知道匹配规则。

所以这个类的基本构造就是下图

构造函数:

通过上面的两个map通过交换机的名字就能够知道和这个交换机相关的所有队列的信息。在这些队列的信息中就具有一个绑定信息,绑定信息就能够知道这个队列的匹配规则。这里并没有再定义一个交换机和绑定信息之间的关系,因为这样再去定义的话,再去删除就需要再额外删除一些东西。

而使用上面的方法如果要删除交换机相关的映射只需要删除一次即可。

下面就是实现上面的函数了,其实主要就是完成sql语句了,这里需要创建的表名字为binding_table其中具有三个属性一个交换机名称,一个队列名称一个匹配规则,都是varchar

删除表也是直接执行一个sql语句即可。

插入函数:

删除绑定信息函数,这里的删除就是特别指定删除某一个绑定信息。

删除交换机绑定信息(同时也会删除队列和绑定的信息)

删除队列绑定信息,这个函数和上面那个函数的实现方法是一样的,只不过是将exchange_name变成了queue_name

最后就是recover函数了,这个函数和之前的哪些了类的实现方法都是一样的:

只不过因为绑定信息中具有了两个map所以需要先通过交换机的名字获取一个map(这个map就是和交换机相关联的队列的map),这个map中储存的是队列和绑定信息类的绑定。

到这里这个类也就基本完成了。这个类最难理解的就是两个map首先是队列的名字和绑定类的map,另外一个map就是交换机的名字和上面这个map的map,这个map代表的是在这个map中所有的队列都是和这个交换机相关联的。

然后就是最后一个管理连接,以及向外提供接口的类了

上面是这个类的框架,下面要做的就是实现这个类。

构造函数:

然后就是函数的实现了,第一个Bind函数,这个函数的实现逻辑就是狗仔一个队列的绑定信息对象,然后添加到队列和绑定的map中,因为一定会访问map所以需要加锁,当然如果这个映射已经存在了那么也是没有必要进行添加的,除此之外对于绑定是否需要进行持久化,也是一个问题,因为如果绑定持久化了,但是这个绑定对应的队列和交换机都没有进行持久化,那么这个绑定持久化是没有必要的。所以如果队列和交换机都进行了持久化,那么绑定自然要进行持久化,反之任何一个不进行持久化,那么这个绑定也没有进行持久化的必要,但是这样就会存在一个问题,要获取交换机和队列的信息就需要将之前写的两个类包含进来,就会导致几个文件之间的耦合。所以这里我选择了添加一个参数,让上层使用者决定这个连接是否进行持久化。这样就不需要让各个类之间出现耦合了。

下一个就是解除绑定:

这个函数的实现原理就是首先通过交换机的名字得到和这个交换机相关的连接的map,然后在这个map中删除qname的连接信息。

然后是移除交换机相关连接的函数,这个函数就很容易实现了,直接调用持久化类中的删除函数进行删除,再去删除map中这个连接相关的信息。

然后是删除队列相关连接的函数了,这个函数函数的持久化删除很简单,但是难点在于如何从_bindings中删除这个队列的连接信息,因为可能很多的交换机都会绑定这个队列。这里我就选择遍历这个方法了。

下一个 返回某个特定交换机的所有连接:

检测特定的连接是否存在:

逻辑也很简单直接从两个map中进行查找就可以了

下一个获取所有连接数量的函数:

逻辑遍历即可:

获取指定连接信息的函数:

最后的清理函数:

在测试之前,需要知道两个哈希表之间的逻辑关系是怎么样的:

下面就是测试函数:

下面的测试测试了删除交换机/队列的测试

下面时查询的测试

测试结果:

全部通过。再去数据库中检查一下是否存在数据:

没有表的信息,删除成功了。下面就是测试recover从硬盘中恢复的功能了:

现在这个表中具有这些信息,然后我直接进行查询exchange2的测试:

只进行上面这一个测试,如果全部通过则recover功能也是没有问题的。

执行结果:

再去数据库中看一下数据是否存在

exchange1和queue1这个连接还是存在的。到这里这个类的所有功能都测试完成且通过了。

队列消息数据管理

这个模块相比较于上面的模块来说比较复杂,所以需要先理解这个模块的逻辑思想,首先就是消息的要素:

消息的要素: 决定了消息的数据结构

  1. 网络传输的消息要素:

消息属性:

  1.   消息id
  2.   消息routing_key
  3.   消息的投递模式

消息的实际内容:

  1.   数据
  2. 服务器上的消息管理所需的额外要素: 最主要就是持久化管理
  3. 消息有效标志:
  1. 这个字段是需要随着消息的持久化内容一起进行持久化
  2. 每一条消息都有可能要进行持久化存储,等到推送给客户端就会删除掉
  3. 然而每次删除一条数据就重写一次文件,效率太低下
  4. 如果设置了有效标志位,每次只需要将这个有效标志位对应的数据给修改为无效即可
  1. 消息的实际存储位置 (相对于文件起始位置的偏移量):
  1. 当要删除某条消息时,需要重写覆盖这条消息在文件的对应位置(将有效标志位置为无效)
  2. 这时候就需要能够找到这条消息
  1. 消息的长度:
  1. 当恢复历史消息以及读取消息内容的时候,需要解决粘包问题

根据以上的内容就构造出了protobuf文件,这里直接定义出了消息,因为之前已经基本做完了消息的定义,这里直接将这些信息提取出来即可。并且这个类已经由proto文件完成并且生成了

所以这里类是不需要我自己来写代码的

然后就是消息的持久化管理类了:

消息的持久化管理:

不使用数据库:有些消息较大,不适合数据库,其次消息的持久化主要是为了备份,而不是为了查询,因此直接使用普通文件进行存储。

  1. 以队列为单元进行消息的持久化管理: 每个队列都有一个自己的数据文件
  1. 当消息文件垃圾回收时,需要重新加载所有有效消息,重新生成新的数据文件。
  2. 但是,生成新的数据文件后,消息的存储位置就发生了变化,这时候需要更新内存中的数据。
  3. 这时候,就需要将所有的队列数据进行加锁,然后进行更新——锁冲突频繁,效率低。
  4. 因此,如果每个队列都有自己独立的数据文件,则每次只需要对操作的队列数据进行加锁即可。
  1. 既然是数据存储在文件中, 那必然就会有数据格式的要求
  1. 4字节长度|数据|4字节长度|数据......
  2. 通过4字节长度,描述消息实际存储长度,就可以解决粘包问题。
  3. 每次先读取4字节的长度,就能够知道这一个包的长度读取,完成之后再去读取下一个包。

然后就是这个类要向外提供的操作:

向外提供的操作:

  1. 消息文件的创建与删除
  2. 消息的新增持久化/删除消息的持久化 (并不是真正的删除,只是将标志位置为无效)
  3. 历史数据恢复/垃圾回收

什么情况下需要垃圾回收:

  • 因为每次删除数据都不是真正删除,因此文件中的数据会越来越多,但是也不是每次删除都需要回收。
  • 当文件中有效消息超过2000条,且其中有效消息比例低于50%。

回收思想:

  • 加载文件中所有有效消息,删除源文件,生成新的数据文件,将数据写入(存在风险,比如我删除了源文件,但是写入新文件失败了怎么办)。
  • 加载文件中所有有效消息,先写入到一个临时文件中,然后再去删除源文件,将临时文件名称改为源文件名称。(比起上一个思想更加安全,并且修改文件名字是没有多少性能消耗的,写入临时文件如果失败了,但是源文件没有删除,有效数据还是在的)
  • 返回所有的有效消息(每条消息中都记录当前的新的存储位置——用于更新内存中数据的内容,必须返回否则就找不到这个消息在硬盘中的具体位置了)。

需要管理的数据:

  1. 队列名
  2. 根据队列名生成数据文件名称: 队列名.mqd
  3. 根据队列名生成的临时文件名称: 队列名.mqd.tmp

以上是关于消息持久化管理和消息本省的定义。

这里先完成这个消息持久化管理类,再继续往下定义新的类。

首先是这个消息数据持久化类的最基本框架:

然后首先是构造函数

然后是创建消息数据文件的接口:

删除消息数据文件的接口

然后就是往队列中进行消息数据的插入了,也就是添加一个新的数据在消息数据文件的后面。

需要进行的步骤:

首先就是对消息中的有效数据进行序列化(因为之后要 进行发送),然后是获取这个消息队列文件的长度,然后是将序列化后的信息写入到这个消息数据文件的最后(通过文件的大小移动偏移量来实现),当然这个写入是可能失败的,最后再更新这个新插入mess中的具体信息

下一个从消息队列文件中删除一个消息的函数,这里需要知道的就是删除并不是删除,而是表示这个消息可以被覆盖了,也就是将这个消息的是否有效位设置为0(当初写proto文件的时候这个标志位设置为string,所以可以设置为0)

从上图就可以看到是否有效字段是在payload这个类的内部的

但是上面这样写是不可行的,因为msg-> plaload()返回的是一个const对象无法进行修改,所以这里需要使用msg->mutable_plaload()这样返回的plaload对象就不是一个const对象了,做出了修改之后,需要对这个消息的有效载荷重新进行序列化,然后再次写入到原来这个消息所在队列数据文件的位置,完成硬盘中数据的更新,但是如果这个序列化后的字符串,不等于这个文件之前在数据文件中的大小,那么无法完成写入,因为此时如果继续写入会导致其它文件的有效数据被覆盖。

然后就是最后一个最重要的函数了,垃圾回收函数(没有在框架中声明函数名,因为当时忘记了),当这个函数触发后会删除数据文件中的无效数据,只保留有效数据。有效数据会暂时被保存在一个临时的文件中,最后删除源文件,修改临时文件名字为源文件名字,最后以链表的形式返回新的有效数据。

下面是第一步加载历史数据中的所有有效数据:

这里的逻辑就是按照消息的储存结构 4字节消息的长度 消息的数据 4字节消息的长度.....将每一个消息都拿到,然后根据标识位判断这个消息是否还有效。有效就放到链表中,否则就不放。

然后就是将有效数据写入到临时文件中了,这里我写一个辅助函数来帮助我进行书写,这个函数就是将一个message对象中的数据写入到指定文件中,需要注意,写入的逻辑是首先写入这个文件数据的长度,然后再写入文件数据,方便读取的时候首先读取到长度,知道接下来要读取的数据长度为多少

完成了这个函数之后,先回到之前写的insert函数中,那个函数就可以复用这个接口了。

回到垃圾回收函数中,使用这个辅助函数也能够完成将链表中的内容写入到临时文件中

然后再删除源文件,再将临时文件的名字修改为源文件的名字

这样这个函数就完成了,但是现在还有一个问题就是这个函数显的有点冗余了,这里可以将加载有效数据封装为一个新的函数。

后面我修改了一下格式变成了size_t长度的数据长度大小,然后是数据

这里还有一个点需要注意:对读取到的信息进行反序列化,读取到的信息是mes中的plaload中的body中的数据,而不是直接让mes进行反序列化,这个错误我犯了,导致后面寻找bug很长时间,还有就是写入的时候,首先写入的应该是body的数据长度,然后才是正式的数据。这之前我忘记了这个协议导致写入的时候直接就写入了数据,而没有写入数据有效长度的大小。导致出现了很多问题

由load函数来完成读取有效数据的功能,这样这个垃圾回收函数只需要调用这个函数即可。

完成了这个load函数之后我想到了这个load函数是存在错误的,如果load读取的是一个空的文件呢?那么就不会进入到遍历的接口,那么临时文件也就不会完成创建,临时文件不会创建之后又删除了源文件自然会影响之后的读取导致出现错误。所以这里需要修改,需要创建一个临时文件,这样也就不会出现问题了。也就是在第二步的前面加上一行新代码:

瞬间就让这个函数不显得冗余了,gc垃圾回收函数也就完成了。到这里消息持久化管理类就完成了。

下面就是消息数据的内存管理了,对于内存消息数据管理有一个特点:如果内存中所有的消息整体进行管理,则在进行垃圾回收以及恢复历史消息上就会变得麻烦。因此每个队列都有一个消息数据的管理结构,然后最终向外提供一个总体的消息管理类。

队列消息管理:

  1. 构造对象时:创建/打开队列数据文件,恢复队列历史消息数据
  2. 新增消息/确认消息(删除)
  3. 垃圾回收:当持久化数据总量超过2000,且有效比例低于50%则进行垃圾回收
  4. 获取队首消息
  5. 删除队列所有消息
  6. 获取待推送消息数量
  7. 获取待确认消息数量
  8. 获取持久化消息数量

需要管理的数据:(因为这个类中的待推送数据结构频繁使用到了头插,头删等操作,所以综合来看使用链表比使用vector效率更高一点)

  1. 持久化的管理句柄
  2. 待推送消息链表:以头插尾删的思想实现队列功能
  3. 持久化消息的hashmap:垃圾回收后需要更新消息数据(实际存储位置)
  4. 待确认消息的hashmap:一条消息被推送给客户端,并不会立即真正删除,而是等到被确认后才会删除。一条消息被推送给客户端后,取出待推送链表,加入到待确认结构中,等到确认后再删除。
  5. 持久化文件中有效消息数量
  6. 持久化文件中总体消息数量:可以计算出文件中有效消息比例(根据这个比例和总体数量决定是否进行垃圾回收)

有了上面的信息之后下面就可以根据上面的信息来写代码了。

下面就是消息数据类内存管理类的基本框架(可能后面还会添加新函数)

后面我修改了这个类的名字:

上面这个类更加符合一点

然后就是去完成内部的功能了。首先就是完善构造函数

下一个函数就是insert,这个函数需要以下几个步骤,第一步构造消息对象,第二步判断这个消息是否需要持久化,第三步进行持久化存储,第四步:进行内存管理。但是还有一种情况那就是客户端只是发送了一条消息,但是并没有给这个消息设置任何的属性,此时就需要知道这个队列是否进行了持久化来决定这个消息是否进行持久化了,但是此时又会导致耦合度问题,所以这里依旧是添加一个参数,这个参数就是是否对这个消息进行持久化,如果客户端发送的消息中具有属性,那么就使用客户端传入的属性,没有就使用这个参数来决定是否进行持久化,同时也需要这个函数自己构造消息id,匹配规则。

到这里这个insert函数就完成了,下一个函数取出待推送列表中首部的消息,然后再将这个消息放入到待确认的哈希表中,这个函数的思想就是这个

写到这里我发现了我上面代码的问题,那就是这些代码都是可以被多个执行流访问的,那么这个类中共有成员(哈希表和链表)就成为了公共资源,为了防止出现问题,需要加上锁。

insert函数加上锁:

front函数加上锁:

下一个函数:删除消息函数,这个函数的思路为:在待确认消息哈希中寻找这个消息如果找到了这个消息,根据这个消息的持久化模式判断是否需要进行持久化的删除。下一步删除持久化信息,同时此时需要通过计算有效信息的比例来判断是否需要执行垃圾回收下一步删除内存信息。这里我先完成一个检查是否需要进行垃圾回收的函数,这个函数是私有的,因为并不需要提供给外层

在进行了gc之后会返回一个链表,这个链表中的信息都是有效的消息,此时的成员变量(总消息数量和有效消息数量都是一样的),还需要更新消息的实际储存位置的信息,这里我就再将这些功能整合成为一个新的函数。

之后在remove中直接调用跟这个gc函数就能够完成垃圾回收等一系列的工作了。游戏就能够来完成remove函数了:

最后还剩下几个数量函数

最后还有一个清理所有数据的函数:

到这里消息队列管理函数就写完了,下面要做的就是整体的消息队列管理了,也就是将各个消息队列统筹起来进行管理了,这个类会提供接口给上层。统筹管理所有的消息队列。

实现一个对外的总体消息管理类:

管理的是每一个队列的消息

管理的成员:

  • 互斥锁
  • 每个队列的消息管理句柄:队列名称 & 队列消息管理句柄的哈希表

提供的操作:

  1. 初始化队列的消息管理句柄:创建队列的时候调用
  2. 销毁队列的消息管理句柄:删除队列的时候调用
  3. 队列的各项消息操作:
  1. 向队列新增消息
  2. 获取队列队首消息
  3. 对队列进行消息确认
  4. 获取队列消息数量:可获取消息数量,持久化消息数量,待确认消息数量,总的持久化消息数量
  5. 回复队列历史消息

下面就来写代码

统筹管理所有消息队列的类了,首先就是大体框架的书写:

首先就是构造函数了,这里需要知道的是什么时候会构造这个管理对象呢?自然是服务器启动的时候了,那么服务器启动的时候,还需要完成一件事情那就是恢复历史消息。既然要恢复历史消息,就需要知道需要恢复消息队列的队列名字,这也是为什么在另外一个文件中写的队列管理文件中会返回一个所有的队列:

通过返回整个队列的名字,来进行队列中消息的恢复,但是我要使用这个map就必须包含另外一个文件的头文件,让模块之间产生了耦合关系。这里在构造函数中就先不这么做,而是交给之后的虚拟机整合的时候由虚拟机来完成。这里先不做这个工作。

不做这个工作之后初始化函数就很简单了直接给basedir赋值即可,然后就是

初始化某一个消息队列的函数,这个函数的逻辑就是在整体队列的映射管理表中检测是否存在这个队列,不存在就插入,存在就什么都不做

但是这里其实是存在一个问题的那就是在构造queue的时候是会进行历史数据的恢复的,而这个操作内部又使用了一个锁去进行保护。这就导致了锁的嵌套问题,锁的嵌套容易发生死锁(比如现在一个线程2正在进行数据恢复(获取了锁2),而线程1获取了锁1正在等待线程2释放锁2,但是线程2需要去获取锁1就导致了死锁问题,当然我这里发生死锁的概率较低,但是不是没有)。所以这里为了能够解决我就将QueueMessage中的历史数据恢复功能单独组成一个函数去进行调用。

然后修改上面的代码:

这样就避免了锁嵌套的问题。

下一个函数销毁消息队列,这个函数的实现逻辑也很简单,在映射表中寻找这个队列,找到了就销毁,没找到就什么都不做。

为了解决锁的嵌套问题使用了作用域。

现在初始化队列句柄和销毁队列句柄都完成了,下一个insert函数

这个函数的实现逻辑就是首先在map中寻找队列句柄,然后使用这个句柄完成新增即可(也要注意锁嵌套的问题)

front函数和上面这个函数实现逻辑基本一样

确认消息ack函数的实现逻辑也是这样的

然后就是几个获取数量的函数了,这些函数的实现逻辑很简单就是获取句柄,然后通过句柄调用返回数量即可

逻辑都是和上面这个图一样的,只不过最后句柄调用的函数不同而已,这里就不展示了。

这里还剩下最后一个函数,清空所有队列中所有信息的函数:这个函数的实现逻辑就是遍历map,然后让map中所有的句柄都执行一次内部的clear函数即可

这里因为清理的时候,即使嵌套了锁,但是clear函数并不会去访问外部被锁保护的资源,所以不会导致死锁问题。所以就这样写了

到这里消息队列和消息数据的管理就完成了,还剩下的就是单元测试了。

首先来进行插入测试:

这里的清理函数我先不执行,看执行了上面的代码之后是否会存在文件。

测试全部通过了,再看数据文件中的内容:

也是4条消息,并且还包含了一些其它的持久化内容.下面是插入和查询测试,首先运行插入测试然后直接运行下面的查询测试,看是否能够通过。

下面测试运行一下:

测试全部通过,说明查询也是没有问题的,现在文件中也是存在内容,注释上面两次的测试进行恢复历史数据的测试:

测试全部通过也就说明恢复功能也就测试通过了,再去看一下临时文件的大小:

也是存在数据的,并且和写入时候的数据大小也是一样的。

下一个测试:删除测试,删除测试也就是从wait哈希表中确认一条消息

这里我在进行了删除测试之后,又接着进行查询的测试

测试通过:

证明删删除也是没有问题的。

最后就是销毁测试了,也就是clear函数,这个测试我之前已经测试过了可以通过这里就不做专门的测试了,对于这种复杂的模块遇到问题需要使用gdb+打印来寻找错误,并进行修改

到这里这次测试就完成了,这个模块我遇到的问题首先就是垃圾回收(gc)中写入数据时没有按照格式进行写入,没有写入数据的长度,而是直接写入了数据,读取的时候也是先读长度再读取数据,然后就是在反序列化的时候应该是Message中的playod中的数据进行反序列化,但是我直接对message中的数据进行了反序列化,导致读取数据失败了。排查很难

到这里这个模块就没有大的问题了,测试也就通过了。

虚拟机管理模块

这个模块是对上面所有模块的一个整合,虚拟机是一个虚幻的概念是数据管理单元的一个载体,这个模块就是一个数据的整合单元,整合的就是消息,队列,绑定信息等的一个类。所以定义的虚拟机的类就有以下的成员。

定义虚拟机类包含以下成员:

a. 交换机数据管理模块句柄

b. 队列数据管理模块句柄

c. 绑定数据管理模块句柄

d. 消息数据管理模块句柄

有了这个类就可以不断的去实例化虚拟机对象,然后这个对象需要对外提供数据管理功能:

  1. 提供声明交换机的功能(存在则OK,不存在则创建)
  2. 提供删除交换机的功能(删除交换机的同时删除关联绑定信息)
  3. 提供声明队列的功能(存在则OK,不存在则创建,创建的同时创建队列关联消息管理对象)
  4. 提供删除队列的功能(删除队列的同时删除关联绑定信息,删除关联消息管理对象及队列所有消息)
  5. 提供交换机-队列绑定的功能
  6. 提供交换机-队列解绑的功能
  7. 提供获取交换机相关的所有绑定信息功能
  8. 提供新增消息的功能
  9. 提供获取指定队列队首消息的功能
  10. 提供消息确认删除的功能

这10个操作其实就是对上面模块的功能进行整合,然后通过虚拟机向外提供出去。

然后就是虚拟机的管理了,但是因为我的这个项目简化了虚拟机,只提供了一个虚拟机的操作,所以虚拟机的管理操作(对虚拟机进行增删查)并没有使用到,所以先不做这个功能,之后可能会实现。想要实现也很简单,只需要再创建一个数据库表,然后将各个虚拟机的信息放入到表中,然后通过查询表来实现恢复虚拟机的功能,这样就可以让项目对外提供多个虚拟机。

在写架子之前,我先解决一下之前模块的头文件重复包含的问题

也就是在这些头文件的头部和尾部增加一个

其它的文件我就不展示了,定义的宏名则自己起。

下面就来写虚拟机的架子:

然后就是去实现这个框架了。

首先就是构造函数了:

构造函数的实现逻辑就是实例化这几个对象,然后需要完成一个工作那就是历史数据的恢复,这里的恢复就是要将上面所有模块的历史数据都要进行一次恢复。一个一个来首先来进行队列的数据恢复,要进行队列数据的恢复,就需要先获取到队列的名字(这个接口在队列管理类中已经实现了)

这个构造函数中虽然有一个虚拟机的名字但是在我的这个简化项目中其实只有一个虚拟机,所以这里这个字段其实没有太大的作用,但是为了以后的拓展我还是加了。

然后就是交换机相关的操作

首先是交换机的声明操作,这个代码很简单直接调用底层的方法即可。

删除交换机的方法就相比来说复杂一点,因为删除一个交换机的时候和这个交换机相关的数据(绑定数据需要一起删除,不能删除绑定的队列,因为一个队列是可以被多个交换机进行绑定的),虽然这两个操作也是调用一下函数即可

然后是队列的相关操作了,队列比起交换机就比较复杂了,因为和队列相关的操作就比较多了,队列不止是创建的时候具有相关性,删除也具有相关性。和队列相关的信息首先就是绑定信息,然后就是消息,每一个队列创建的时候都需要初始化对应的消息句柄,同理删除的时候这些消息也要一起被删除。

所以对于创建队列的函数首先就要完成初始化的工作,然后才是添加的操作。

这样才是正式完成了一个队列的添加,打个比方就是我要开一个加盟店,那么首先我要去管理加盟店的机构中完成对我的这个加盟店的注册,然后才是去开加盟店。

删除的时候和对象相关的数据存在两个一个是队列的消息,一个队列的绑定信息

然后就是和绑定相关的操作了,这里需要知道的是要将一个队列绑定到一个交换机上是存在相关要求的,首先就是队列和交换机必须是存在的,另外一个绑定信息是否要进行持久化的标准是队列和交换机都要进行持久化,那么这个绑定信息也要进行持久化。

解除绑定则没有这么麻烦的操作,直接调用解除绑定即可。

下一个接口是获取交换机的所有绑定信息的接口,直接调用函数即可。以上的函数其实都只是对之前完成的接口的一个整合。

下面是对消息队列中的消息进行的操作,首先就是对消息进行发布的一个接口,要实现这个功能直接调用底层的方法即可,但是这里需要思考一个点就是如果我要求这个消息进行持久化,但是这个队列是不进行持久化的,那么这个消息也是没有必要进行持久化的,所以这里需要进行一点操作。

这里首先需要修改一下之间在消息队列中对于一个消息的插入接口的方法:

主要创建了一个mode变量,如果队列要进行持久化,那么mode就等于这个消息的持久化策略,否则这个消息一定不会进行持久化(即使消息要求进行持久化)

下一个消费一个消息的函数,也就是从消息队列中取出一个消息的方法。直接调用函数即可。

下一个函数确认一个消息,依旧是调用函数即可

这样这个类也就完成了,并且这个类其实不太需要测试因为这个类主要就是调用了前面写的方法,但是因为我修改了一点代码所以这里我还是进行测试。最后不要忘了这个类依旧要防止头文件的重复包含。

这里为了测试我还在类中额外添加了一个clear和判断某一个交换机/队列/绑定信息是否存在的操作。

这两个操作都是为了测试而增加的

然后就是测试模块的编写了,首先是数据的准备和清理:

这样就建立了三个交换机,三个队列。然后三个队列中分别存在3条信息.

首先进行初始化的测试:

直接判断这些交换机,队列和绑定信息都是应该存在的,然后就是获取一条消息

直接测试看是否通过:

测试全部通过,并且数据库文件和消息数据文件都是存在的:

这里没有被清理是因为我没有调用删除函数。

下一个就是删除交换机的测试:要验证的就是在删除交换机的时候绑定信息是否会被删除

测试结果:

全部通过。然后是对于队列的删除测试,删除了之后还需要进行消息的获取测试,此时获取的消息应该是nullptr

依旧是全部通过

这就说明了队列删除和交换机删除是没有问题的。下一个ack功能的测试为了让消息删除显示的更加明显,我在ack的底层调用函数中增加了一个日志打印

然后再去测试ack功能:

直接进行测试:

测试全部通过并且打印出来我想要的信息

交换机路由管理

现在当一个用户想要发布消息的时候,这个用户可能不知想往一个队列中发布消息,而是想往多个队列中进行信息的发布。比如说,这个用户发布的这个新闻可能既是个体育新闻也是个花边新闻,那么这个信息就要放到两个队列中去。如果只能一个新闻专门放入到一个模块中的话,这个新闻就要发布两次信息,分别放入到两个不同的队列中才能将这个信息发布完成,这样对用户的使用来说很不方便,由此就在AMQP协议中提出了一个交换机的概念,用户在发布信息的时候并不会将消息直接发布到某个队列中,而是先发布到交换机上,交换机在进行路由匹配决定这个信息,应该发布到哪些队列中。当然创建交换机的时候,交换机和队列之间的关系也是由用户自己来进行设定的。设定完成之后,用户发布信息到交换机中,交换机就会将这个信息发布到对应的队列当中,交换机要将信息发布到哪一个队列中,就由匹配路由来进行决定。在设计这个模块的时候,大佬们就提出了不同类型的交换机的概念。以及当数据发布到某一个队列的时候就有了两个决定性的因素:第一个就是交换机类型:存在三种类型的交换机

  1. 广播交换:直接将消息交给所有绑定的队列,无需匹配。
  2. 直接交换:队列绑定信息中的 ​​binding_key​​​ 与消息中的 ​​routing_key​
  3. 主题交换:只有匹配队列主题的消息才会被放入队列中。

第二个要素就是匹配规则。有了这两个要素之后,再结合交换机和队列的关系画一张图,表示一个用户将信息发布给交换机,交换机再经过匹配规则决定这个信息要发布到哪一个队列中。

如果交换机的类型是一个广播交换,那么这个news就会被直接发布到后面的三个模块中,如果这个交换机是一个直接交换,那么这个消息就只会被发布到队列1中,因为只有这个队列的binding_key和routing_key是完全一样的。如果交换机的类型是一个主题交换,则是如果binding_key和routing_key符合一个匹配规则(这个匹配规则一般不可能是让binding_key和routing_key是一样的,因为如果是这个机制为什么不使用直接交换呢?),则会进行发布否则不进行发布。简单理解这个匹配规则就是在binding_key或者routing_key中存在一些通配符,这个通配符可以匹配一些特定/任意的单词。比如果上图中第一个队列的binding_key为news.#然后用户发布新闻的routing_key为news.tidbits此时这个消息可以匹配到第一个队列,因为#可以匹配任何符号,而第二个队列则完全一样自然也是可以匹配的,第三个队列不匹配。这个匹配规则详细的说明在后面。并且为了更加方便进行匹配,binding_key和routing_key也是具有自己的规则的。

由此就能够整合出这个模块的功能了:

路由交换模块:

功能:判断一个消息中的routing_key与队列的binding_key是否匹配成功。

取决要素两个:交换机类型,routing_key与binding_key的匹配

因此基于功能需求分析:路由交换模块只需要对传入的数据进行处理即可,因此这个模块要实现的实际上是一个功能接口类(没有成员变量)。

提供的功能:

  1. 判断routing_key是否合法:必须是a~z, A~Z, 0~9, . _ 组成。(不需要通配符,因为routing_key只是描述一个新闻类型的字符串)
  2. 判断binding_key是否合法:
  1. 必须是a~z, A~Z, 0~9, * # . _ 组成。
  2. 注意:* # 是通配符,必须独立存在,* 和 # 不能连续出现。(需要通配符,因为需要进行规则匹配)
  1. 进行routing_key与binding_key的路由匹配:
  1. 广播:不管如何都是成功的。
  2. 直接:相等则成功。
  3. 主题:按匹配模式,完成了则成功。

以上就是这个模块需要做的事情以及需要提供的操作,总的来说这个模块其实是一个功能模块,只需要判断传入的字符串是否符合特定的规则即可,所以这个模块是没有需要进行管理的数据的。当然想要实现主题匹配也是比较难的,这个功能最后去完成,先去完成其它的功能。主题这个功能需要单独留出时间去进行考虑。

下面首先来完成这个功能类的架子搭建

然后就是去完成这个三个函数了,首先来完成比较简单的前面两个函数。首先是判断routingkey是否符合规则的函数,这个函数最是简单,只需要判断字符串中是否存在不符合规则的字符即可。

然后就是bindingkey的合法性判断了。

这个合法性判断其实也很简单,只不过是添加了几个东西而已,首先是合法字符增加了通配符,然后是通配符必须独立存在,不能和其它字符一起,还有一个routingkey只能存在一个通配符,#通配符前面或者后面不能连续出现其它的通配符。

最后就是匹配规则函数的编写了,采用的方法为动态规划。

主题匹配涉及到一个概念就是队列和交换机之间的一个匹配,如何匹配呢?首先bindingkey和routingkey都是以.作为间隔的一个单词,这个单词就是对用户发送消息的一种描述,也是对这个队列能够接收什么消息的一种描述:首先是直接匹配,下面两个字符串:

因为music和#是不一样的,所以两者匹配就失败了,但是如果是主题匹配因为#是一个统配符就不能凭借但是是否一致来进行决定了,还需要额外的算法来完成这个过程。这个算法是是什么呢?请看下面的过程说明:

首先是两个字符串,然后对字符串进行分隔之后在二维数组中进行各个分割字符串的比较:

可以看到routingkey的ddd和bindingkey的ddd是匹配成功了,但是这并不意味着这就是匹配成功了,因为ddd前面的父级没有匹配成功,所以如果两个单词匹配成功了应该从数组的左上方继承结果,如果没有匹配成功那么这里的结果就是0。由此能够得到一个动态转移方程,dp[i][j]代表的是bindingkey的以i分割字符为结尾的字符串和routingkey的以第j个分割字符串为结尾的字符串是否匹配成功。

由此就能够得到状态转移方程:

dp[i][j] = 如果i位置的字符串和j位置的字符串能够匹配成功,那么dp[i][j] = dp[i-1][j-1](可以理解为这一个字符串匹配成功了,但是之前的字符串是否匹配成功的结果就在dp[i-1][j-1]中)如果没有匹配成功dp[i][j] = false。但是还有通配符的存在:比如下面这个例子:

因为存在通配符导致aaa匹配#从左上方得到结果是1,但是bbb和#进行匹配得到的结果虽然成功了但是是0,导致匹配失败,但是#其实可以匹配0个或者多个任意的单词,这个匹配应该是成功的,此时就需要进行专门的考虑,当遇到#匹配成功的时候不仅能够从左上角获得结果,也可以从前一列获取结果也就是dp[i][j]中如果i等于#那么dp[i][j] = dp[i-1][j-1]/dp[i][j-1]。但是还是无法解决所有的问题比如下面这个:

所以这个#通配符还可以从上方进行结果的得到。

下一个例子:

aaa和aaa匹配之间进行匹配成功,但是因为左上方为0,导致匹配失败,但是单词之间的匹配无法从上方进行结果获取,要解决这个问题也很要解决,当bindingkey是以#作为开头字符就将左上方的这个值从0修改为1即可 。*号不需要进行考虑,*匹配的是任意一个单词,并不会匹配多个单词,所以遇到*号可以认为遇到了和本单词一样的单词即可。到这里这个算法就结束了,可以进行代码的书写了

到这里路由匹配模块就完成了,下面就是测试了。

因为这个类没有什么数据需要进行初始化,所以初始化也不需要进行数据的初始。

首先来测试routingkey和bindingkey的合法性接口:

最后的测试结果:

全部通过,证明routingkey和bindingkey的合法性不需要进行验证了。

还剩下最后一个route测试了,首先需要准备好一堆的routingkey和bindingkey数据:准备的数据如下:

测试的逻辑就是准备三个数组,分别储存routingkey bindingkey和这一次route的结果,然后执行一次for循环遍历数组,每一次都拿取一个routing_key和binding_key进行比较,然后和答案进行比较即可。这里主要就是测试了主题交换,另外两个交换没有测试的必要

测试全部都通过了,这就说明这个路由匹配模块也是没有问题了。

相关文章:

仿RabitMQ 模拟实现消息队列项目开发文档2(个人项目)

项目需求分析 核心概念 现在需要将这个项目梳理清楚了&#xff0c;便于之后的代码实现。项目中具有一个生产消费模型&#xff1a; 其中生产者和消费者的个数是可以灵活改变的&#xff0c;让系统资源更加合理的分配。消息队列的主逻辑和上面的逻辑基本一样&#xff0c;只不过我…...

李佳琦回到巅峰背后,双11成直播电商分水岭

时间倏忽而过&#xff0c;又一年的双11即将宣告结束。 从双11正式开始前的《新所有女生的offer》&#xff0c;到被作为“比价”标杆被其他平台直播间蹭、被与其他渠道品牌比较&#xff0c;再到直播间运营一时手快多发了红包……整个双11周期下来&#xff0c;李佳琦直播间在刷新…...

云计算在教育领域的应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 云计算在教育领域的应用 云计算在教育领域的应用 云计算在教育领域的应用 引言 云计算概述 定义与原理 发展历程 云计算的关键技…...

C语言 | Leetcode C语言题解之第543题二叉树的直径

题目&#xff1a; 题解&#xff1a; typedef struct TreeNode Node;int method (Node* root, int* max) {if (root NULL) return 0;int left method (root->left, max);int right method (root->right, max);*max *max > (left right) ? *max : (left right);…...

6、If、While、For、Switch

6、If、While、For、Switch 一、If 1、if-else if (boolean) {代码块 } else if (boolean) {代码块 } else if (boolean) {代码块 } else { // 默认情况代码块 }关于IDEA单元测试控制台不能输入数据的问题&#xff1a; https://blog.csdn.net/m0_72900498/article/details/…...

萤石设备视频接入平台EasyCVR多品牌摄像机视频平台海康ehome平台(ISUP)接入EasyCVR不在线如何排查?

随着智慧城市和数字化转型的推进&#xff0c;视频监控系统已成为保障公共安全、提升管理效率的重要工具。特别是在大中型项目中&#xff0c;跨区域的网络化视频监控需求日益增长&#xff0c;这要求视频监控管理平台不仅要具备强大的视频资源管理能力&#xff0c;还要能够适应多…...

【多线程】线程池如何知道一个线程的任务已经完成

目录 1. 说明2. 任务的生命周期3. 状态更新4. 线程间的协作5. 内部数据结构6. 回调与通知7. 线程池的关闭与清理 1. 说明 1.线程池通过一系列内部机制来知道一个线程的任务已经完成。2.这些机制主要涉及任务的生命周期管理、状态更新以及线程间的协作。 2. 任务的生命周期 1…...

Transformer介绍(一)

Transformer是一种特殊的神经网络&#xff0c;一种机器学习模型。 谷歌在2017年推出的原版Transformer&#xff0c;论文《Attention Is All You Need》&#xff0c;专注于将一种语言的文本翻译成另一种。 而我们要关注的Transformer变种&#xff0c;即构建ChatGPT等工具的模型…...

[CKS] TLS Secrets创建与挂载

目前的所有题目为2024年10月后更新的最新题库&#xff0c;考试的k8s版本为1.31.1 BackGround 您必须使用存储在TLS Secret中的SSL文件&#xff0c;来保护Web 服务器的安全访问。 Task 在clever-cactus namespace中为名为clever-cactus的现有Deployment创建名为clever-cactu…...

java双向链表解析实现双向链表的创建含代码

双向链表 一.双向链表二.创建MyListCode类实现双向链表创建一.AddFirst创建&#xff08;头插法&#xff09;二.AddLast创建&#xff08;尾叉法&#xff09;三.size四.remove(指定任意节点的首位删除)五.removeAll(包含任意属性值的所有删除)六.AddIndex(给任意位置添加一个节点…...

【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的&#xff0c;之后再看看能能出一个公司业务场景中的消息流。 一、下载github.com/segmentio/kafka-go包 go get github.com/segmentio/kafka-go二、建立kafka连接 正常来说下面的配置host topic partition 应该写在…...

redis:set集合命令,内部编码,使用场景

个人主页 &#xff1a; 个人主页 个人专栏 &#xff1a; 《数据结构》 《C语言》《C》《Linux》《网络》 《redis学习笔记》 文章目录 前言命令SADDSMEMBERSSISMEMBERSCARDSPOPSMOVESREM集合间操作SINTERSINTERSTORESUNIONSUNIONSTORESDIFFSDIFFSTORE 内部编码使用场景总结 前言…...

45期代码随想录算法营总结

代码随想录训练营总结与收获 在为期60天的代码随想录训练营结束后&#xff0c;我感慨良多。这段时间不仅让我在编程技能上有了明显的提升&#xff0c;更让我在学习习惯和时间管理上有了深刻的反思和改变。 报名参加这个训练营对我来说是一个重要的监督机制。之前我总是拖延&a…...

深入理解Java中的instanceof关键字及接口新特性:方法实现的可能性

目录 引言 1. 什么是instanceof关键字&#xff1f; 1.1 语法结构 1.2 instanceof的用法示例 1.3 instanceof的应用场景 2. Java中的接口能包含方法实现吗&#xff1f; 2.1 默认方法&#xff08;Default Method&#xff09; 2.2 静态方法&#xff08;Static Method&…...

【python中如果class没有self会怎行】

python中如果class没有self会怎样TOC 在Python中&#xff0c;self是一个约定俗成的名称&#xff0c;用于表示类的实例。如果没有使用self&#xff0c;会导致以下问题&#xff1a; 1、无法访问实例属性&#xff1a; 在类的方法中&#xff0c;如果没有self&#xff0c;方法将无…...

【算法】(Python)动态规划

动态规划&#xff1a; dynamic programming。"programming"指的是一种表格法&#xff0c;而非编写计算机程序。通常解决最优化问题&#xff08;optimization problem&#xff09;。将问题拆分成若干个子问题&#xff0c;求解各子问题来得到原问题的解。适用于多阶段…...

EasyExcel 学习之 导出 “提示问题”

EasyExcel 学习之 导出 “提示问题” 现象分析解决&#xff08;伪代码&#xff09;前端 POST 实现后端实现 现象 EasyExcel 支持导出 xlsx、xls、csv 三种文件格式。在导出过程中可能发生各种异常&#xff0c;当发生异常时应该提示错误信息而非导出一个错误的文件。 分析 首…...

应用系统开发(3)低功耗四运算放大器LM324N

LM324N 是一种广泛使用的 低功耗四运算放大器,由德州仪器(Texas Instruments)和其他制造商生产。它具有四个独立的运算放大器,能够在单电源或双电源模式下运行,适合多种模拟电路应用。以下是详细信息: 芯片基本信息 型号:LM324N封装类型:常见 DIP(双列直插封装)或 SO…...

基于微信小程序的电商平台+LW示例参考

1.项目介绍 系统角色&#xff1a;管理员、普通用户功能模块&#xff1a;管理员&#xff08;用户管理、商品分类、商品管理、订单管理、系统管理等&#xff09;&#xff0c;普通用户&#xff08;个人中心、收藏、我的订单、查看商品等&#xff09;技术选型&#xff1a;SpringBo…...

[Android] Graphic Buffer 的申请

前言&#xff1a; MediaCodec 支持 texture mode&#xff0c;即MediaCodec解码video完毕后把 yuv 数据填入 GPU 共享出来的 graphic buffer 里面&#xff0c;app 会把 video 的 yuv数据 和 ui 的数据通过通过软件渲染组件(opengl等)发送给GPU 进行一并渲染。这样做的效率较低&…...

【大数据学习 | HBASE高级】storeFile文件的合并

Compaction 操作分成下面两种&#xff1a; Minor Compaction&#xff1a;是选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile&#xff0c;对于删除、过期、多余版本的数据不进行清除。 Major Compaction&#xff1a;是指将所有的StoreFile合并成一个StoreFile&am…...

多平台编包动态引入依赖的解决方案

最近开发时遇到了这样的需求&#xff0c;A 平台需要引入一个 video.js&#xff0c;B 平台却是不需要的&#xff0c;那么面向 B 平台打包的时候把依赖装进去自然就不大合适。最好的方法是动态引入依赖&#xff0c;根据平台来判断要不要引入 动态引入依赖 很快啊&#xff0c;动…...

[单例模式]

目录 [设计模式] 单例模式 1. 饿汉模式 2. 懒汉模式 3. 单例模式的线程安全问题 [设计模式] 设计模式是软件工程中的一种常见做法, 它可以理解为"模板", 是针对一些常见的特定场景, 给出的一些比较好的固定的解决方案. 不同语言适用的设计模式是不一样的. 这里…...

速盾:游戏盾的功能和原理详解

速盾有一款专注于网络游戏安全的防护系统&#xff0c;它通过实时监测游戏网络流量和玩家行为&#xff0c;以及使用先进的算法和技术进行分析和识别&#xff0c;检测出各种外挂、作弊行为和恶意攻击&#xff0c;从而保障游戏的公平性和玩家的安全性。 速盾游戏盾的主要功能包括…...

Spleeter:音频分离的革命性工具

目录 什么是Spleeter&#xff1f;Spleeter的工作原理Spleeter的应用场景Spleeter的技术优势Spleeter的挑战与局限性结论 什么是Spleeter&#xff1f; Spleeter 是一个由 Deezer 开发的开源音频源分离工具。它基于深度学习技术&#xff0c;尤其是卷积神经网络&#xff08;CNN&a…...

【笔记】自动驾驶预测与决策规划_Part6_不确定性感知的决策过程

文章目录 0. 前言1. 部分观测的马尔可夫决策过程1.1 POMDP的思想以及与MDP的联系1.1.1 MDP的过程回顾1.1.2 POMDP定义1.1.3 与MDP的联系及区别POMDP 视角MDP 视角决策次数对最优解的影响 1.2 POMDP的3种常规解法1.2.1 连续状态的“Belief MDP”方法1. 信念状态的定义2. Belief …...

openresty入门教程:access_by_lua_block

在OpenResty中&#xff0c;access_by_lua_block 是一个功能强大的指令&#xff0c;它允许你在Nginx的访问控制阶段执行Lua脚本。这个阶段发生在Nginx处理请求的过程中&#xff0c;紧接在rewrite阶段之后&#xff0c;但在请求被传递到后端服务器&#xff08;如PHP、Node.js等&am…...

Caused by: org.apache.flink.api.common.io.ParseException: Row too short:

Flink版本 1.17.2 错误描述 Caused by: org.apache.flink.api.common.io.ParseException: Row too short: 通过flink中的flinkSql直接使用对应的connector去获取csv文件内容&#xff0c;报获取的数据太短了 可能原因 1.创建的表字段多于csv文件当中的表头 定位 在获取csv…...

hbase的安装与简单操作

好的&#xff0c;这里是关于 HBase 的安装和基本操作的详细步骤&#xff0c;分成几个更清晰的阶段&#xff1a; 第一部分&#xff1a;安装和配置 HBase 1. 环境准备 HBase 依赖于 Hadoop&#xff0c;因此首先确保 Hadoop 已经正确安装和配置。如果没有安装&#xff0c;请先下…...

PySpark本地开发环境搭建

一.前置事项 请注意&#xff0c;需要先实现Windows的本地JDK和Hadoop的安装。 二.windows安装Anaconda 资源&#xff1a;Miniconda3-py38-4.11.0-Windows-x86-64&#xff0c;在window使用的Anaconda资源-CSDN文库 右键以管理员身份运行&#xff0c;选择你的安装路径&#x…...

成都科技网站建设哪里有/不要手贱搜这15个关键词

DNA甲基化作为表观遗传的一种标记&#xff0c;在生长发育和疾病发生过程中扮演着重要角色。随着大规模甲基化研究的进行&#xff0c;积累了大量疾病相关的甲基化数据&#xff0c;DiseaseMeth就是一个保存了人类疾病相关的甲基化信息的数据库。官网地址如下http://bio-bigdata.h…...

广州官网优化/专业优化网站排名

要脱单&#xff0c;你就在公司里这些岗位上下功夫&#xff0c;身边的女生有主了&#xff0c;人家还有闺蜜呢不是&#xff0c;伺候好了说不定哪天就给你介绍一个&#xff0c;接下来不妨从身边的这些女生下手~~~~转载于:https://blog.51cto.com/13457136/2130036...

网站后台上传软件/优化营商环境条例解读

静默安装Oracle时提示&#xff1a;"[SEVERE] - Email Address Not Specified"系统环境&#xff1a;CentOS 6.3 x86_64 Oracle 11gR2解决办法&#xff1a;修改responseFile文件&#xff0c;将DECLINE_SECURITY_UPDATES的值设为true&#xff0c;如果为空系统会假设该值…...

做网站手机版/百度推广有哪些售后服务

2019 CCF大学生计算机系统与程序设计竞赛(Collegiate Computer Systems& Programming Contest, CCF CCSP)总决赛于10月16日在苏州市职业大学正式拉开战幕&#xff0c;来自浙江大学、同济大学等全国75所高校的461位选手从9点开始&#xff0c;到21点进行了一场历时12个小时的…...

学会wordpress 怎么赚钱/百度学术查重

向然学习资料网为同学们提供伍德里奇计量经济学导论第4版网课伍德里奇《计量经济学导论》(第4版)网授精讲班【教材精讲&#xff0b;考研真题串讲】网课目录伍德里奇《计量经济学导论》(第4版)网授精讲班【共54课时】序号 名称 课时1 绪论 00:45:212 第1章 计量经济学的性质与经…...

哪些网站做推广/网站建设网站设计

JS交互与webView的工作原理浅析webView是什么WebView是android中一个非常实用的组件&#xff0c;它和safai、chrome一样都是基于webkit网页渲染引擎&#xff0c;可以通过加载html数据的方式便捷地展现软件的界面。在WebView的设计中&#xff0c;不是什么任务都由WebView类完成的…...