当前位置: 首页 > news >正文

参考RabbitMQ实现一个消息队列

文章目录

  • 前言
  • 小小消息管家
    • 1.项目介绍
    • 2. 需求分析
      • 2.1 API
      • 2.2 消息应答
      • 2.3 网络通信协议设计
    • 3. 开发环境
    • 4. 项目结构介绍
      • 4.1 配置信息
    • 5. 项目演示

前言

消息队列的本质就是阻塞队列,它的最大用途就是用来实现生产者消费者模型,从而实现解耦合以及削峰填谷

在分布式系统中不再是单个服务器而是服务器“集群”,如果我们我们直接A服务器给B服务器发送请求,B服务器给A服务器返回响应,这样的话我们AB的耦合较大,如果A或者B服务器挂了,我们业务也就崩溃了。引入消息队列之后我们将请求和响应都通过消息队列这个中间人来传递,就降低了耦合度。

同样的,如果我们AB服务器直接进行通信,如果A服务器突然发送许多请求,我们B服务器也会收到巨多请求的影响,AB由于硬件资源限制可能都会崩溃。如果我们引入消息队列,消息队列可以将许多请求接收存储下来,B服务器依然可以按照原有节奏取请求,不会一下子接收大量请求。这也就是我们所说的削峰填谷,也是类似的,就算请求过少,我们的服务器依然可以从消息队列中取出挤压得请求。

在分布式系统中,跨主机之间使用生产者消费者模型就显得非常重要了。 我们通常把阻塞队列封装成⼀个独立的服务器程序,并且新增一些功能。这样的程序我们就称为 消息队列

所以此次就仿照市面上得RabbitMQ实现一个简易的消息队列。

小小消息管家

1.项目介绍

将消息队列分成服务器模块、客户端模块、公共模块来实现。

  1. 服务器模块通过虚拟主机实现交换机、队列、绑定、消息等相关操作的隔离。虚拟主机主要负责对上述内容的数据进行硬盘(数据库和文件)以及内存管理、消息的三种转发方式如Direct、Fanout、Topic、为提供客户端API的实现。
  2. 客户端模块:实现连接管理提供建立连接、信道管理通过信道实现TCP连接的复用。在信道管理中实现客户端调用的API。
  3. 公共模块:约定客户端服务器的通信协议、数据传输过程中的序列化以及反序列化。

在这里插入图片描述

2. 需求分析

由于是实现一个生产者消费者模型,在消息队列中我们要实现的逻辑如下:我们的生产者客户端向服务器发送消息,消费者客户端向服务器订阅消息,服务器负责消息的存储和转发。

在这里插入图片描述

概念解读:

  • 虚拟主机 (VirtualHost):是⼀个逻辑上的集合,⼀个 BrokerServer 上可以存在多个 VirtualHost。
  • 交换机 (Exchange):生产者把消息先发送到 BrokerServer 的 Exchange 上。 再根据不同的规则把消息转发给不同的 Queue。
  • 队列 (Queue):真正用来存储消息,每个消费者决定自己从哪个 Queue 上读取消息。
  • 绑定 (Binding):Exchange 和 Queue 之间的关联关系。Exchange 和 Queue 可以理解成 “多对多” 关
    系。
  • 消息 (Message): 传递的内容。

2.1 API

我们的服务器提供以下API是西安消息队列的基本功能。

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

我们的客户端提供以下API供客户使用消息队列,为了复用TCP连接,我们提供了一个Channel逻辑通道。所以在客户端我们还需要提供Channel的创建和关闭:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

2.2 消息应答

被消费者消费的消息,需要进行应答来确定我们消费者正确消费了消息。我们设置两种应答模式:

  1. 自动应答:消费者只要消费了消息,就算应答完毕。Broker直接删除这个消息。
  2. 手动应答:消费者手动调用应答接口(确认消息),Broker收到应答请求后删除这个消息。

2.3 网络通信协议设计

我们使用TCP 协议,来作为通信的底层协议。在这个基础上自定义应用层协议,实现客户端对服务器提供的功能远程调用。所以我们在协议中约定type标记调用的功能,length为消息的长度,payload为消息的具体内容。

请求和响应的格式如下:
在这里插入图片描述

其中 type取值如下:
• 0x1 创建 channel
• 0x2 关闭 channel
• 0x3 创建 exchange
• 0x4 销毁 exchange
• 0x5 创建 queue
• 0x6 销毁 queue
• 0x7 创建 binding
• 0x8 销毁 binding
• 0x9 发送 message
• 0xa 订阅 message
• 0xb 返回 ack
• 0xc 服务器给客户端推送的消息(被订阅的消息)

其中 payload 部分,会根据不同的 type,存在不同的格式。对于请求来说, payload 表示这次方法调用的各种参数信息,我们定义对应的类实现。
对于响应,payload 表示这次方法调用的返回值。

3. 开发环境

数据库:SQLite
开发语言:Java
技术框架:SpringBoot、SpringMVC、Mybatis
管理工具:Maven
开发工具:Intellij IDEA 2020.1.4
操作系统:Windows10

4. 项目结构介绍

在这里插入图片描述

  • common 包中约定通信协议包括请求响应格式以及不同的payload对应的数据格式;创建自定义异常类;实现序列化反序列化;定义消费者以及处理消息调用的函数接口。
  • demo 创建了一个消费者和生产者用于测试项目。
  • mqclient 客户端模块,提供创建连接的工厂类;定义完整连接的内容;定义channel实现客户端api
  • mqserver.core 定义了交换机、队列、消息、交换机类型、转发规则、消费消息的逻辑。
  • mqserver.datacenter 定义了交换机、队列、消息、绑定的存储以及管理。
  • mqserver.mapper:实现对sqlite数据库的操作。

4.1 配置信息

由于我们对于数据库的存储只涉及一小部分,所以此处我们利用sqlite进行数据管理。

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.14</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>mq</artifactId><version>0.0.1-SNAPSHOT</version><name>mq</name><description>mq</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.3.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc --><dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter-test</artifactId><version>2.3.1</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

application.yml: 配置数据库信息
在这里插入图片描述

5. 项目演示

创建一个生产者客户端向服务器发送一个消息:

package com.example.mq.demo;import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;/*** 这个类用来表示一个生产者.*  通常这是一个单独的服务器程序.*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者");//创建一个连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);//得到逻辑链接,这个就类似于socketConnection connection = factory.newConnection();Channel channel = connection.createChannel();// 通过逻辑连接创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);// 创建一个消息并发送byte[] body = "hello 欢迎消费消息".getBytes();boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);System.out.println("消息投递完成! ok=" + ok);Thread.sleep(500);channel.close();connection.close();}
}

创建一个消费者客户端,订阅消费消息

package com.example.mq.demo;import com.example.mq.common.Consumer;
import com.example.mq.common.MqException;
import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;/*** @author zq* @date 2023-08-05 19:29*//** 这个类表示一个消费者.* 通常这个类也应该是在一个独立的服务器中被执行*/
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);//订阅消息,并定义如何消费消息channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消费数据] 结束!");}});// 通过这个循环模拟一直等待消费完生产者生产的所有消息while (true) {Thread.sleep(500);}}
}

实现结果如下 :

在这里插入图片描述
在这里插入图片描述

通过结果我们可以看出,我们消费者成功取出订阅的队列中的消息进行消费

相关文章:

参考RabbitMQ实现一个消息队列

文章目录 前言小小消息管家1.项目介绍2. 需求分析2.1 API2.2 消息应答2.3 网络通信协议设计 3. 开发环境4. 项目结构介绍4.1 配置信息 5. 项目演示 前言 消息队列的本质就是阻塞队列&#xff0c;它的最大用途就是用来实现生产者消费者模型&#xff0c;从而实现解耦合以及削峰填…...

SpringBoot+JWT

一、maven坐标 <!-- JWT依赖 --><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.9.1</version></dependency><dependency><groupId>com.auth0</groupId>&…...

Cad二次开发EqualPoint

在 CAD 软件的二次开发中&#xff0c;Tolerance.Global.EqualPoint 是一个特定的属性或方法&#xff0c;用于表示全局的相等性公差值。这个属性或方法通常是由 CAD 软件的开发平台或 API 提供的&#xff0c;用于处理浮点数的相等性比较。 具体来说&#xff0c;Tolerance.Globa…...

20230806将ASF格式的视频转换为MP4

20230806将ASF格式的视频转换为MP4 2023/8/6 18:47 缘起&#xff0c;自考中山大学的《计算机网络》&#xff0c;考试《数据库系统原理》的时候找到视频&#xff0c;由于个人的原因&#xff0c;使用字幕更加有学习效率&#xff01; 由于【重型】的PR2023占用资源较多&#xff0c…...

【MySQL】——常用接口API即相关函数说明

目录 1、MySQL结构体的说明 1、MYSQL结构体 2.MYSQL_RES结构体 3. MYSQL_FIELD 2. 接口的使用步骤 3、mysql_init()——MYSQL对象初始化 4、mysql_real_connect()——数据库引擎建立连接 5. mysql_query()——查询数据库某表内容 6、mysql_real_query——执行SQL语句 …...

ts + axios + useRequest (ahooks)—— 实现请求封装

现在越来越多的项目开始ts化&#xff0c;我们今天就一块学习一下&#xff0c;关于ts的请求封装。 首先要安装两个依赖&#xff1a; npm i axios -S npm i ahooks -S 引入&#xff1a; import { useRequest } from ahooks; import axios, { AxiosRequestConfig, AxiosRespo…...

Springboot @Validated注解详细说明

在Spring Boot中&#xff0c;Validated注解用于验证请求参数。它可以应用在Controller类或方法上 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId> </depen…...

STM32初学者,到底选标准库还是HAL库?

当初学者尝试学习STM32开发时&#xff0c;通常会面临一个关键的选择&#xff1a;是选择STM32的标准库&#xff0c;还是HAL库&#xff1f;这两个库各自有着优势与适用场景&#xff0c;本文将从多个角度分析&#xff0c;帮助初学者更好地选择适合自己的库。 在开始之前&#xff…...

小学生作业随机加减乘除运算计算习题答案 html源码

小学生作业随机加减乘除运算计算习题答案 html源码 这道题目提供了多种选项,包括运算符和输入的运算数范围。题目数量也可以选择。如果你选择好了选项,就可以点击出题按钮进行练习。 为了方便,题目答案可以打印出来。但是,如果隐藏了横线,就会去除等号后面的下划线。推荐使用…...

nvm下载安装配置

一、作用 nvm是node版本管理的工具&#xff0c;具有管理、下载、切换node版本等能力。经常不同项目需要依赖不同版本的node&#xff0c;此时nvm就能有效的解决node版本切换的问题。 二、nvm下载安装配置 &#xff08;1&#xff09;安装包地址 https://github.com/coreybutl…...

2023-08-07力扣每日一题

链接&#xff1a; 344. 反转字符串 题意&#xff1a; 如题 解&#xff1a; 初级算法做过的题啊-感觉这几天重复题还蛮多的 实际代码&#xff1a; #include<iostream> #include<vector> #include<algorithm> using namespace std; /* void reverseStri…...

uni——不规则tab切换(skew)

案例展示 案例代码 <!-- 切换栏 --> <view class"tabBoxs"><view class"tabBox"><block v-for"(item,index) in tabList" :key"index"><view class"tabItem":class"current item.id&…...

Docker安装Grafana以及Grafana应用

Doker基础 安装 1、 卸载旧的版本 sudo yum remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-engine 2、需要的安装包 sudo yum install -y yum-utils 3、设置镜像的仓库 yum-config-m…...

OpenSource - 分布式重试平台

文章目录 概述重试方案对比设计思想流量管理平台预览场景应用强通知场景发送MQ场景回调场景异步场景 概述 在当前广泛流行的分布式系统中&#xff0c;确保系统数据的一致性和正确性是一项重大挑战。为了解决分布式事务问题&#xff0c;涌现了许多理论和业务实践&#xff0c;其…...

oracle稳定执行计划

二、稳定执行计划 &#xff08;一&#xff09;sql profile的好处 稳定执行计划 在不能修改目标sql的sql文本的情况下使目标sql语句按照指定的执行计划运行。 1、automatic类型的sql profile 本质是针对目标sql的一些额外的调整信息&#xff0c;这些额外的调整信息需要与原目标s…...

docker安装neo4j

参考文章&#xff1a; 1、Mac 本地以 docker 方式配置 neo4j_neo4j mac docker_Abandon_first的博客-CSDN博客 2、https://www.cnblogs.com/caoyusang/p/13610408.html 安装的时候&#xff0c;参考了以上文章。遇到了一些问题&#xff0c;记录下自己的安装过程&#xff1a; …...

第十五章 定义 HL7 的 DTL 数据转换

文章目录 第十五章 定义 HL7 的 DTL 数据转换 第十五章 定义 HL7 的 DTL 数据转换 每个接口可能需要一定数量的数据转换。创建转换时&#xff0c;不要使用保留的包名称。 重要提示&#xff1a;请勿在数据转换中手动更改 HL7 转义序列&#xff1b;自动处理这些。 可以使用“数…...

【笔记】移动光猫改桥接

1. 登录后台 移动光猫的超管和密码&#xff08;百度的&#xff09; 账号&#xff1a;CMCCAdmin 密码&#xff1a;aDm8H%MdA 浏览器访问 192.168.1.1 并登录 2. 选择连接 点击“网络”&#xff0c;在“连接名称”下拉框选择 INTENET_R_VID 字样的连接&#xff0c;并截图备…...

网络安全进阶学习第十四课——MSSQL注入

文章目录 一、MSsql数据库二、MSsql结构三、MSsql重点表1、master 数据库中的Sysdatabases 表2、Sysobjects 表3、Syscolumns 表 四、Mssql常用函数五、Mssql的报错注入六、Mssql的盲注常用以下函数进行盲注&#xff1a; 七、联合注入1、获取当前表的列数2、获取当前数据库名3、…...

【C语言】初阶结构体

&#x1f388;个人主页&#xff1a;库库的里昂 &#x1f390;CSDN新晋作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 ✨收录专栏&#xff1a;C语言初阶 ✨其他专栏&#xff1a;代码小游戏 &#x1f91d;希望作者的文章能对你有所帮助&#xff0c;有不足的地方请在评论…...

24届近5年南京理工大学自动化考研院校分析

今天学长给大家带来的是南京理工大学控制考研分析 满满干货&#xff5e;还不快快点赞收藏 一、南京理工大学 ​ 学校简介 南京理工大学是隶属于工业和信息化部的全国重点大学&#xff0c;学校由创建于1953年的新中国军工科技最高学府——中国人民解放军军事工程学院&#xf…...

5.PyCharm基础使用及快捷键

在前几篇文章中介绍了PyCharm的安装和汉化,本篇文章一起来看一下PyCharm的基本用法和一些快捷键的使用方法。 本篇文章PyCharm的版本为PyCharm2023.2 新建项目和运行 打开工具,在菜单中——文件——新建项目 选择项目的创建位置(注意最好不要使用中文路径和中文名项目名称…...

RabbitMQ的安装

RabbitMQ的安装 1、Windows环境下的RabbitMQ安装步骤 使用的版本&#xff1a;otp_win64_23.2 rabbitmq-server-3.8.16 版本说明&#xff1a;https://www.rabbitmq.com/which-erlang.html#compatibility-matrix 1.1 下载并安装erlang RabbitMQ 服务端代码是使用并发式语言…...

GPU版PyTorch对应安装教程

一、正确安装符合自己电脑的对应GPU版本的PyTorch之前需要了解三个基本概念 算力、CUDA driver version、CUDA runtime version ①算力&#xff1a;需要先知道你的显卡&#xff0c;之后根据官网表格进行对应&#xff0c;得到算力 ②CUDA driver version&#xff1a;电脑上显卡…...

医学影像PACS临床信息系统源码

医学影像临床信息系统&#xff08;Picture Archiving and Communication Systems&#xff09;PACS是指从医疗影像设备中获得数字影像&#xff0c;利用高速网络进行存储、管理、传输的医疗影像信息管理系统。通过该系统&#xff0c;能实现影像数字化、无胶片化管理。 登记系统 …...

Python(Web时代)——jinja2模板

简介 Jinja2是Flask框架默认支持的模板引擎&#xff0c;是python的web项目中被广泛应用的一种模板引擎&#xff0c;jinja2的作者与Flask是同一个人。 jinja2具有以下特点&#xff1a; 非常灵活&#xff0c;提供了控制结构、表达式与继承等 性能好 可读性强 渲染一个模板&a…...

酷开系统 | 酷开科技,让数据变得更有价值!

身处信息时代&#xff0c;我们每个人时刻都在生成、传递和应用数据&#xff0c;数据已经成为了现代社会中宝贵的资源之一&#xff0c;而在人工智能领域&#xff0c;数据更是被称为人工智能的“燃料”。 而在AI的发展中&#xff0c;只有拥有高质量、多样性且充分代表性的数据集…...

uni——tab切换

案例展示 案例代码 <view class"tablist"><block v-for"(item,index) in tabList" :key"index"><view class"tabItem" :class"current item.id?active:" click"changeTab(item)">{{item.nam…...

类图的6种关系和golang应用

文章目录 1. 依赖和关联1.1 依赖&#xff08;Dependency&#xff09;概念类图示例代码示例 1.2 关联&#xff08;Association&#xff09;概念类图示例代码示例 2. 组合和聚合&#xff08;特殊的关联关系&#xff09;2.1 聚合&#xff08;Aggregation&#xff09;概念类图示例代…...

Linux tar 备忘清单

tar 备忘清单 语法选项创建一个 tar 格式的压缩文件创建压缩后的 tar.gz 存档文件生成压缩率更高的 tar.bz2 文件解压缩 tar 文件解压缩 tar.gz 文件解压缩 tar.bz2 文件列出归档内容从 tar 归档文件中提取单个文件从 tar 归档文件中提取多个文件使用通配符提取文件组添加文件或…...