当前位置: 首页 > news >正文

根据源码,模拟实现 RabbitMQ - 内存数据管理(4)

目录

一、内存数据管理

1.1、需求分析

1.2、实现 MemoryDataCenter 类

1.2.1、ConcurrentHashMap 数据管理

1.2.2、封装交换机操作

1.2.3、封装队列操作

1.2.4、封装绑定操作

1.2.5、封装消息操作

1.2.6、封装未确认消息操作

1.2.7、封装恢复数据操作


一、内存数据管理


1.1、需求分析

当前已经使用 数据库 管理了 交换机、绑定、队列,又使用 数据文件 管理了 消息.

最后还使用一个类将上述两部分整合在了一起,对上层提供统一的一套接口.

但对于 MQ 来说,是以内存存储数据为主,硬盘存储数据为辅(硬盘数据主要是为了持久化保存,重启之后,数据不丢失).

接下来就需要使用 内存 来管理上述数据~~

这里我们主要使用 ConcurrentHashMap 来进行数据管理(主要是因为线程安全问题).

交换机:使用 ConcurrentHashMap,key 是 name,value 是 Exchange 对象。

队列:使用 ConcurrentHashMap,key 是 name,value 是 MSGQueue 对象。

绑定:使用嵌套的 ConcurrentHashMap,key 是 exchangeName,value 是一个 ConcurrentHashMap(key 是 queueName,value 是 Binding 对象)。

消息:使用 ConcurrentHashMap,key 是 messageId ,value 是 Message 对象。

队列和消息的关联关系:使用嵌套的 ConcurrentHashMap,key 是 queueName,value 是一个 LinkedList(每个元素是一个 Message 对象)。

表示 “未被确认” 的消息:使用嵌套的 ConcurrentHashMap,key 是 queueName,value 是 ConcurrentHashMap(key 是 messageId,value 是 Message 对象,后续实现消息确认的逻辑,需要根据 ack 响应的内容,会提供一个确认的 messageId,根据这个 messageId 来把上述结构中的 Message 对象找到并移除)。

Ps:此处实现的 MQ,支持两种应答模式的 ACK

  1. 自动应答:消费者取了元素,整个消息就算是被应答了,此时整个消息就可以被干掉了。
  2. 手动应答:消费者取了元素,这个消息不算被应答,需要消费者主动再调用一个 basicAck 方法,此时才认为是真正应答了,才能删除这个消息。

1.2、实现 MemoryDataCenter 类

1.2.1、ConcurrentHashMap 数据管理

这里就是用 ConcurrentHashMap 来对上述数据进行统一内存管理.

    //key 是 exchangeName, value 是 Exchange 对象private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();//key 是 queueName, value 是 MSGQueue 对象private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();//第一个 key 是 exchangeName,第二个 key 是 queueNameprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();//key 是 messageId ,value 是 Message 对象private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();//key 是 queueName , value 是 Message 的链表private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();// 第一个 key 是 queueName, 第二个 key 是 messageIdprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();

1.2.2、封装交换机操作

主要就是对 exchangeMap 插入、获取、删除交换机.

public void insertExchange(Exchange exchange) {exchangeMap.put(exchange.getName(), exchange);System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName=" + exchange.getName());}public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}public void deleteExchange(String exchangeName) {exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName=" + exchangeName);}

1.2.3、封装队列操作

主要就是对 queueMap 插入、获取、删除队列.

    public void insertQueue(MSGQueue queue) {queueMap.put(queue.getName(), queue);System.out.println("[MemoryDataCenter] 新队列添加成功! queueName=" + queue.getName());}public MSGQueue getQueue(String queueName) {return queueMap.get(queueName);}public void deleteQueue(String queueName) {queueMap.remove(queueName);System.out.println("[MemoryDataCenter] 队列删除成功! queueName=" + queueName);}

1.2.4、封装绑定操作

这里值得注意的是加锁逻辑,并不是加了锁就一定安全,也不是说不加锁就一定不安全,如果这段代码前后逻辑性很强,需要打包成一个原子性的操作,那就可以进行加锁,如果不是那么强的因果,就没必要,因为加锁也是需要开销的,加锁之后的锁竞争更是一个时间消耗。

    public void insertBinding(Binding binding) throws MqException {
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//        if(bindingMap == null) {
//            bindingMap = new ConcurrentHashMap<>();
//            bindingsMap.put(binding.getExchangeName(), bindingMap);
//        }//上面这段逻辑可以用以下代码来替换ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());synchronized(bindingMap) {//再根据 queueName 查一下,只有不存在的时候才能插入,存在就抛出异常if(bindingMap.get(binding.getQueueName()) != null) {throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName=" + binding.getExchangeName() +", queueName=" + binding.getQueueName());}bindingMap.put(binding.getQueueName(), binding);}System.out.println("[MemoryDataCenter] 新绑定添加成功!exchangeName=" + binding.getExchangeName() +", queueName=" + binding.getQueueName());}/*** 获取绑定有两个版本* 1.根据 exchangeName 和 queueName 确定唯一一个 Binding* 2.根据 exchangeName 获取到所有的 Binding* @param exchangeName* @param queueName* @return*/public Binding getBinding(String exchangeName, String queueName) throws MqException {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if(bindingMap == null) {throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName=" + exchangeName +", queueName=" + queueName);}return bindingMap.get(queueName);}public ConcurrentHashMap<String, Binding> getBindings(String exchangName) {return bindingsMap.get(exchangName);}public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String, Binding>  bindingMap = bindingsMap.get(binding.getExchangeName());//这里操作不是很关键,因此可以不用加锁(加锁不一定就安全,也不是说不加锁就一定不安全,要结合实际场景)//如果这段代码前后逻辑性很强,需要打包成一个原子性的操作,那就可以进行加锁,如果不是那么强的因果,就没必要,因为加锁也是需要开销的,加锁之后的锁竞争更是一个时间消耗if(bindingMap == null) {throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName=" + binding.getExchangeName() +", queueName=" + binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 绑定删除成功!exchangeName=" + binding.getExchangeName() +", queueName=" + binding.getQueueName());}

1.2.5、封装消息操作

这里值得注意的是 LinkedList 是线程不安全的,要特殊处理.

    /*** 添加消息* @param message*/public void addMessage(Message message) {messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 新消息添加成功!messageId=" + message.getMessageId());}/*** 根据 id 查询消息* @param messageId*/public Message selectMessage(String messageId) {return messageMap.get(messageId);}/*** 根据 id 删除消息* @param messageId*/public void removeMessage(String messageId) {messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息被移除!messageId=" + messageId);}/*** 发送消息到指定队列* @param message*/public void sendMessage(MSGQueue queue, Message message) {//先根据队列名字找到指定的链表LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());//LinkedList 是线程不安全的synchronized (messages) {messages.add(message);}//这里把消息在消息中心也插入一下。即使 message 在消息中心存在也没关系//因为相同的 messageId 对应的 message 的内容一定是一样的(服务器不会修改 Message 的内容)addMessage(message);System.out.println("[MemoryDataCenter] 消息被投递到队列当中!messageId=" + message.getMessageId());}/*** 从队列中取消息* @param queueName* @return*/public Message pollMessage(String queueName) {LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages == null) {return null;}synchronized (messages) {if(messages.size() == 0) {return null;}//链表中有消息就进行头删Message currentMessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId=" + currentMessage.getMessageId());return currentMessage;}}/*** 获取指定队列的消息个数* @param queueName* @return*/public int getMessageCount(String queueName) {LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages == null) {return 0;}synchronized (messages) {return messages.size();}}

1.2.6、封装未确认消息操作

“未被确认” 的消息:使用嵌套的 ConcurrentHashMap,key 是 queueName,value 是 ConcurrentHashMap(key 是 messageId,value 是 Message 对象,后续实现消息确认的逻辑,需要根据 ack 响应的内容,会提供一个确认的 messageId,根据这个 messageId 来把上述结构中的 Message 对象找到并移除)。

    /*** 添加未确认的消息* @param queueName* @param message*/public void addMessageWaitAck(String queueName, Message message) {ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,k -> new ConcurrentHashMap<>());messageHashMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 消息进入待确认队列!messageId=" + message.getMessageId());}/*** 删除未确认的消息* @param messageId*/public void removeMessageWaitAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if(messageHashMap == null) {return;}messageHashMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息从待确认队列中删除!messageId=" + messageId);}public Message getMessageWaitAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if(messageHashMap == null) {return null;}return messageHashMap.get(messageId);}

1.2.7、封装恢复数据操作

从硬盘上读取数据,把硬盘中之前持久化存储的各个维度的数据都恢复到内存中.

public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {//1.先清空之前所有的数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();//2.恢复所有的交换机数据List<Exchange> exchanges = diskDataCenter.selectAllExchanges();for(Exchange exchange : exchanges) {exchangeMap.put(exchange.getName(), exchange);}//3.恢复所有的队列数据List<MSGQueue> queues = diskDataCenter.selectAllQueue();for(MSGQueue queue : queues) {queueMap.put(queue.getName(), queue);}//4.恢复所有绑定数据List<Binding> bindings = diskDataCenter.selectAllBindings();for(Binding binding : bindings) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(), binding);}//5.恢复所有的消息数据for(MSGQueue queue : queues) {LinkedList<Message> messages = diskDataCenter.loadAllMessagesFromQueue(queue.getName());queueMessageMap.put(queue.getName(), messages);//遍历所有的队列,根据每个队列名字。来恢复所有消息for(Message message : messages) {messageMap.put(message.getMessageId(), message);}}}

Ps;“未确认的消息” 这部分数据不需要从硬盘中恢复,之前硬盘存储也没有考虑过这里~

一旦在等待 ack 的过程中,服务器重启了,这些 “未被确认的消息” 就恢复成了 “未被取走的消息”,这个消息在硬盘上存储的时候,就是当作 “未被取走”。

 

相关文章:

根据源码,模拟实现 RabbitMQ - 内存数据管理(4)

目录 一、内存数据管理 1.1、需求分析 1.2、实现 MemoryDataCenter 类 1.2.1、ConcurrentHashMap 数据管理 1.2.2、封装交换机操作 1.2.3、封装队列操作 1.2.4、封装绑定操作 1.2.5、封装消息操作 1.2.6、封装未确认消息操作 1.2.7、封装恢复数据操作 一、内存数据管理…...

Apache Flume架构和原理

Apache Flume是一个开源的分布式、可靠的日志收集和聚合系统,旨在将大量的日志数据从不同的数据源(如应用程序、服务器、设备)收集到中心存储或数据湖中。Flume的架构设计允许用户在大规模数据流的情况下实现可靠的数据传输和处理。 Flume特性 Apache Flume是一个用于收集…...

代码随想录算法训练营day38 | LeetCode 509. 斐波那契数 70. 爬楼梯 746. 使用最小花费爬楼梯

509. 斐波那契数&#xff08;题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台&#xff09; 思路&#xff1a;经典的dp题。 int fib(int n){if(n 0 || n 1) return n;return fib(n-1) fib(n-2); } 70. 爬楼梯&#xff08;题目…...

Linux基本指令【下】

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析3 目录 &#x1f449;&#x1f3fb;cat&#x1f449;&#x1f3fb;echo&#xff08;输出…...

向量检索:基于ResNet预训练模型构建以图搜图系统

1 项目背景介绍 以图搜图是一种向量检索技术&#xff0c;通过上传一张图像来搜索并找到与之相关的其他图像或相关信息。以图搜图技术提供了一种更直观、更高效的信息检索方式。这种技术应用场景和价值非常广泛&#xff0c;经常会用在商品检索及购物、动植物识别、食品识别、知…...

SpringBoot 响应头添加版本号、打包项目后缀添加版本号和时间

文章目录 响应头添加版本号获取版本号添加响应处理器请求结果 打包项目后缀添加版本号和时间实现打包结果 响应头添加版本号 获取版本号 在 pom.xml 中&#xff0c;在 project.version 下定义版本号 在 application.yml 获取 pom.xml 中 project.version 中的信息 添加响应处…...

优化指南:带宽限制的可行策略

大家好&#xff01;作为一名专业的爬虫程序员&#xff0c;我们经常面临的一个挑战就是带宽限制。尤其是在需要快速采集大量数据时&#xff0c;带宽限制成为了我们提升爬虫速度的一大阻碍。今天&#xff0c;我将和大家分享一些解决带宽限制的可行策略&#xff0c;希望能帮助大家…...

计算机提示mfc120u.dll缺失(找不到)怎么解决

在计算机领域&#xff0c;mfc120u.dll是一个重要的动态链接库文件。它包含了Microsoft Foundation Class (MFC) 库的特定版本&#xff0c;用于支持Windows操作系统中的应用程序开发。修复mfc120u.dll可能涉及到解决与该库相关的问题或错误。这可能包括程序崩溃、运行时错误或其…...

Java基于SpringBoot+Vue实现酒店客房管理系统(2.0 版本)

文章目录 一、前言介绍二、系统结构三、系统详细实现3.1用户信息管理3.2会员信息管理3.3客房信息管理3.4收藏客房管理3.5用户入住管理3.6客房清扫管理 四、部分核心代码 博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝30W,Csdn博客专家、掘金/华为云/阿里云…...

微服务架构2.0--云原生时代

云原生 云原生&#xff08;Cloud Native&#xff09;是一种关注于在云环境中构建、部署和管理应用程序的方法和理念。云原生应用能够最大程度地利用云计算基础设施的优势&#xff0c;如弹性、自动化、可伸缩性和高可用性。这个概念涵盖了许多方面&#xff0c;包括架构、开发、…...

C++day2作业(2023.8.22)

1.定义一个学生的结构体&#xff0c;包含学生的姓名&#xff0c;年龄&#xff0c;成绩&#xff0c;性别&#xff0c;学生的成绩&#xff0c;姓名&#xff0c;定义为私有权限&#xff1b;定义一个学生类型的结构体变量&#xff0c;设置公有函数用于给学生的成绩和名字进行赋值&a…...

在 Spring Boot 中使用 OpenAI ChatGPT API

1、开始咯 我们来看看如何在 Spring Boot 中调用 OpenAI ChatGPT API。 我们将创建一个 Spring Boot 应用程序&#xff0c;该应用程序将通过调用 OpenAI ChatGPT API 生成对提示的响应。 2、OpenAI ChatGPT API 在开始具体讲解之前&#xff0c;让我们先探讨一下我们将在本教…...

【leetcode】225.用队列实现栈

分析&#xff1a; 队列遵循先入先出的原则&#xff0c;栈遵循后入先出的原则 也就是说&#xff0c;使用队列实现栈时&#xff0c;入队操作正常&#xff0c;但是出队要模拟出栈的操作&#xff0c;我们需要访问的是队尾的元素&#xff1b;题目允许使用两个队列&#xff0c;我们可…...

机器学习中XGBoost算法调参技巧

本文将详细解释XGBoost中十个最常用超参数的介绍&#xff0c;功能和值范围&#xff0c;及如何使用Optuna进行超参数调优。 对于XGBoost来说&#xff0c;默认的超参数是可以正常运行的&#xff0c;但是如果你想获得最佳的效果&#xff0c;那么就需要自行调整一些超参数来匹配你…...

第1章:计算机网络体系结构

文章目录 1.1 计算机网络 概述1.概念2.组成3.功能4.分类5.性能指标1.2 计算机网络 体系结构&参考模型1.分层结构2.协议、接口、服务3.ISO/OSI模型4.TCP/IP模型1.1 计算机网络 概述 1.概念 2.组成 1.组成部分&...

【Java 动态数据统计图】动态数据统计思路Demo(动态,排序,containsKey)三(115)

上代码&#xff1a; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map;public class day10 {public static void main(String[] args) {List<Map<String,O…...

【游戏评测】河洛群侠传一周目玩后感

总游戏时长接近100小时&#xff0c;刚好一个月。 这两天费了点劲做了些成就&#xff0c;刷了等级&#xff0c;把最终决战做了。 总体感觉还是不错的。游戏是开放世界3D游戏&#xff0c;Unity引擎&#xff0c;瑕疵很多&#xff0c;但胜在剧情扎实&#xff0c;天赋系统、秘籍功法…...

java新特性之Lambda表达式

函数式编程 关注做什么&#xff0c;不关心是怎么实现的。为了实现该思想&#xff0c;java有了一种新的语法格式&#xff0c;Lambda表达式。Lambda本质是匿名内部类对象&#xff0c;是一个函数式接口。函数式接口表示接口内部只有一个抽象方法。使用该语法可以大大简化代码。 …...

【考研数学】线形代数第三章——向量 | 2)向量组相关性与线性表示的性质,向量组的等价、极大线性无关组与秩

文章目录 引言二、向量组的相关性与线性表示2.3 向量组相关性与线性表示的性质 三、向量组等价、向量组的极大线性无关组与秩3.1 基本概念 写在最后 引言 承接前文&#xff0c;我们来学习学习向量组相关性与线性表示的相关性质 二、向量组的相关性与线性表示 2.3 向量组相关性…...

Java中调用Linux脚本

在Java中&#xff0c;可以使用ProcessBuilder类来调用Linux脚本。以下是一个简单的示例&#xff0c;展示了如何在Java中调用Linux脚本&#xff1a; 创建一个Linux脚本文件&#xff08;例如&#xff1a;myscript.sh&#xff09;&#xff0c;并在其中编写需要执行的命令。确保脚…...

生成xcframework

打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式&#xff0c;可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

大数据零基础学习day1之环境准备和大数据初步理解

学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 &#xff08;1&#xff09;设置网关 打开VMware虚拟机&#xff0c;点击编辑…...

为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?

在建筑行业&#xff0c;项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升&#xff0c;传统的管理模式已经难以满足现代工程的需求。过去&#xff0c;许多企业依赖手工记录、口头沟通和分散的信息管理&#xff0c;导致效率低下、成本失控、风险频发。例如&#…...

渲染学进阶内容——模型

最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

【git】把本地更改提交远程新分支feature_g

创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...