RabbitMQ实践——搭建多人聊天服务
大纲
- 用户登录
- 创建聊天室
- 监听Stream(聊天室)
- 发送消息
- 实验
- 登录
- Tom侧
- Jerry侧
- 创建聊天室
- Jerry侧
- Tom侧
- 进入聊天室
- Jerry侧
- Tom侧
- 发送消息
- Jerry发送消息
- Jerry侧聊天室
- Tom侧聊天室
- Tom发送消息
- Jerry侧聊天室
- Tom侧聊天室
- 代码工程
- 参考资料
在《RabbitMQ实践——搭建单人聊天服务》一文中,我们搭建了Tom和Jerry两人的聊天服务。在这个服务中,它们都向Fanout交换器发送消息。而Fanout会将消息路由到它们两各自监听的队列。这样它们就可以得到全部消息。

如果是多人聊天,比如10个人聊天,按上述方案,需要Fanout交换器绑定10个队列。这就会使得结构变得非常复杂。
这是因为Classic类型队列在消费者确认读取消息后,会将消息从队列中删除。这样就需要我们使用fanout向多个队列路由消息,以供不同消费者消费。如果多个消费者消费同一个队列,则会导致每个消费者得到的都是部分信息。这就不符合我们理解的聊天场景。
但是我们可以使用Stream类型队列来解决这个问题。
Stream类型队列和之前的Classic队列的不同点是:Stream队列并不会清除消息。消息会一直存在于Stream队列中,消费者可以从指定位置开始读取消息。这样我们只要有一个Stream队列保存消息,所有消费者都从队列中读取消息即可。

用户登录
关于用户登录的流程我们在《RabbitMQ实践——搭建单人聊天服务》中已经有详细的介绍。即上图中黑色字体1、2、3、4、5的步骤。
创建聊天室
我们会创建一个以聊天室名称命名的交换器和Stream类型队列。即上图中黑色字体6、7、8、9的步骤。
需要注意的是Stream类型队列创建方案和Classic类型类似,只需要多指定"x-queue-type"=“stream”。但是对于Durable(持久化)只能设置为True,exclusive只能设置为False,autoDelete只能设置为False。
package com.rabbitmq.chat.service;import java.util.Collections;
import java.util.Date;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import reactor.core.publisher.Flux;@Service
public class ChatRoomV2 {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createChatRoom(String admin, String roomName) {createChatRoom(roomName);}private void createChatRoom(String roomName) {rabbitTemplate.execute(action -> {action.exchangeDeclare(roomName, "fanout", false, true, null);action.queueDeclare(roomName, true, false, false,Collections.singletonMap("x-queue-type", "stream"));action.queueBind(roomName, roomName, "");return null;});}
聊天室创建完毕后,会通知所有登录的用户。
@PostMapping("/create")public void create(@RequestParam String admin, @RequestParam String roomName) {chatRoomV2.createChatRoom(admin, roomName);core.notifyEveryone(roomName + " is created");}
监听Stream(聊天室)
public Flux<String> receive(String username, String roomName) {return Flux.create(emitter -> {rabbitTemplate.execute(channel -> {channel.basicQos(100);Date timestamp = new Date(System.currentTimeMillis());channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", timestamp),(consumerTag, message) -> {String senderOfMessage = message.getProperties().getHeaders().get("username").toString();String show = "You Said: ";if (!senderOfMessage.equals(username)) {show = senderOfMessage + " Said: ";}show += new String(message.getBody());System.out.println(show);emitter.next(show);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}
我们将"x-stream-offset"设置为当前毫秒数,是表示我们只读取当前时间之后发布的消息。这也符合聊天室的业务特点:不能读取历史消息。
当我们收到消息后,我们会获取消息Header中的自定义字段username,它标志了消息的发布者。如果发布者和读取者是同一人,我们将展示内容前面新增“You Said:”;如果是别人说的,则标记发布者的名称。
由于我们使用了WebFlux响应式编程,所以Controller层要做特殊处理
@GetMapping(value = "/receive", produces = "text/event-stream")public Flux<String> receive(@RequestParam String username, @RequestParam String roomName) {return chatRoomV2.receive(username, roomName);}
发送消息
每个聊天室用户只要给之前创建的Fanout交换器发送消息即可。在这一步,我们给他们发送的消息Header中新增了字段username,以标记是谁发送的。
public void send(String username, String roomName, String message) {Message msg = MessageBuilder.withBody(message.getBytes()).setHeader("username", username).build();rabbitTemplate.send(roomName, "", msg);}
实验
登录
Tom侧

Jerry侧

创建聊天室
Jerry侧
Jerry申请创建一个聊天室

在管理后台,我们看到对应的交换器和Stream都创建出来了。


同时在刚才的登录接口界面,Jerry收到了通知

Tom侧
Tom也会收到通知

进入聊天室
Tom和Jerry在收到通知后,可以通过receive接口进入聊天室,监听聊天室内容变化。
Jerry侧

Tom侧

发送消息
Jerry发送消息

Jerry侧聊天室

Tom侧聊天室

Tom发送消息

Jerry侧聊天室

Tom侧聊天室

代码工程
https://github.com/f304646673/RabbitMQDemo
参考资料
- https://www.rabbitmq.com/docs/streams
相关文章:
RabbitMQ实践——搭建多人聊天服务
大纲 用户登录创建聊天室监听Stream(聊天室)发送消息实验登录Tom侧Jerry侧 创建聊天室Jerry侧Tom侧 进入聊天室Jerry侧Tom侧 发送消息Jerry发送消息Jerry侧聊天室Tom侧聊天室 Tom发送消息Jerry侧聊天室Tom侧聊天室 代码工程参考资料 在《RabbitMQ实践——…...
git分布式版本控制系统
Git - Downloads (git-scm.com) gitee教程(超全,超详细,超长)-CSDN博客 Git教程 - 廖雪峰的官方网站 (liaoxuefeng.com) 所有的版本控制系统,其实只能跟踪文本文件改动,比如TXT文件,网页&…...
基于weixin小程序的民宿短租系统的设计与实现
管理员账户功能包括:系统首页,个人中心,房主管理,房间类型管理,用户管理,民宿信息管理,民宿预订管理,系统管理 小程序功能包括:系统首页,民宿信息,…...
2024-06-22力扣每日一题
链接: 2663. 字典序最小的美丽字符串 题意 略 解: 要求字符串内不存在任何长度为 2 或更长的回文子字符串,则在任意位置不存在aa或aba形式 由于要被给定字符串字典序大,且找到符合条件的字典序最小字符串,则竟可…...
S_LOVE多端恋爱小站小程序源码 uniapp多端
S_LOVE多端恋爱小站小程序源码,采用uniapp多端开发框架进行开发,目前已适配H5、微信小程序版本。 源码下载:https://download.csdn.net/download/m0_66047725/89421726 更多资源下载:关注我。...
如何避免MySQL的死锁或性能下降
1、按顺序访问数据 确保多个线程或事务在访问多个表或行时,按照相同的顺序进行。这可以避免循环等待和资源竞争,从而降低死锁的风险。 2、避免长时间持有锁 尽量缩短事务的执行时间,避免长时间持有锁。长时间持有锁会增加其他事务等待的时…...
《C语言》编译和链接
文章目录 一、翻译环境1、预处理2、编译3、汇编4、链接 二、运行环境 一、翻译环境 在使用编译器编写代码时,编写的代码是高级语言,机器无法直接识别和运行,在编译器内部会翻译成机器可执行的机器语言。 编译环境由编译和链接两大过程组成。 …...
group by和select的兼容性问题
group by和select的兼容性问题 在标准的SQL语法中,GROUP BY 和 SELECT 之间不存在兼容性问题,因为它们是 SQL 查询语句的基本组成部分,而且它们的使用方式是相互兼容的。 SELECT 子句和 GROUP BY 子句的关系: SELECT 子句&#…...
切面aspect处理fegin调用转本地调用
切面处理fegin调用转本地调用 问题:原fegin调用转本地调用详细描述方案代码实现总结问题:原fegin调用转本地调用 项目原来是微服务项目服务与服务之间是通过fegin进行交互的,但是现在微服务项目要重构为单体项目,原fegin调用的方法要给为本地调用 详细描述 zyy-aiot │ …...
Linux 磁盘挂载与分区
Linux 磁盘挂载与分区 vda1: 其中vd表示虚拟磁盘,a表示第一块磁盘,b表示第二块磁盘,1表示第一块磁盘的第一分区(显然两块磁盘都只有一个分区)图中可以看到,vda1磁盘只有一个分区,且全部挂载到根…...
Open3D 将ShapeNet数据集txt转pcd
目录 一、概述 二、代码实现 三、实现效果 一、概述 ShapeNet 数据集是一个广泛使用的三维物体数据集,主要用于计算机视觉、计算机图形学、机器人学和机器学习等领域的研究。它包含大量的三维物体模型,并附有丰富的标注信息。ShapeNet 数据集由普林斯…...
综合项目实战--jenkins节点模式
一、DevOps流程 DevOps是一种方法论,是一系列可以帮助开发者和运维人员在实现各自目标的前提下,向自己的客户或用户交付最大化价值及最高质量成果的基本原则和实践,能让开发、测试、运维效率协同工作的方法。 DevOps流程(自动化测试部分) DevOps完整流程 二、gitee+j…...
WhaleStudio 2.6重磅发布!调度模块WhaleScheduler更新78项核心功能
我们很高兴地宣布WhaleStudio 2.6版本的正式发布!新版本中包含了数据调度模块WhaleScheduler和数据集成模块WhaleTunnel的百余项核心功能更新,本文摘选了WhaleScheduler常用功能更新的概况,关于WhaleTunnel的更新详情将于近期发布,…...
笔记101:OSQP求解器的底层算法 -- ADMM算法
前言1:这篇博客仅限于介绍拉格朗日乘子法,KKT条件,ALM算法,ADMM算法等最优化方法的使用以及简版代码实现,但不会涉及具体的数学推导;不过在下面我会给出具体数学推导的相关文章和截图,供学有余力…...
Java银系统/超市收银系统/智慧新零售/ERP进销存管理/线上商城/h5/小程序
>>>系统简述: 神点收银系统支持B2B2C多商户模式,系统基于前后端分离的架构,后端采用Java SpringBoot Mysql Mybatis Plus,前端基于当前流行的Uniapp、Element UI,支持小程序、h5。架构包含:会员端…...
大学网页制作作品1
作品须知:1.该网页作品预计分为5个页面(其中1个登录页面,1个首页主页面,3个分页面),如需要可自行删改增加页面。(总共约800行html,1200行css,100行js) 2.此网页源代码只用于学习和模…...
【会议征稿,IEEE出版】第三届机器人、人工智能与智能控制国际会议(RAIIC 2024,7月5-7)
第三届机器人、人工智能与智能控制国际会议(RAIIC 2024)将于2024年7月5-7日中国绵阳举行。 RAIIC 2024是汇聚业界和学术界的顶级论坛,会议将邀请国内外著名专家就以传播机器人、人工智能与智能控制领域的技术进步、研究成果和应用做专题报告…...
离线部署OpenIM
目录 1.提取相关安装包和镜像 2.安装docker和docker-compose 3.依次导入镜像 4.解压安装包 5.执行安装命令 6.PC Web 验证 7.开放端口 7.1IM 端口 7.2Chat 端口 7.3 PC Web 及管理后台前端资源端口 “如果您在解决类似问题时也遇到了困难,希望我的经验分享…...
sql:between and日期毫秒精度过多导致的查询bug
复现 一般情况下,前端传的日期值大多都是yyyy-MM-dd HH:mm:ss(标准格式),比如2024-06-25 10:49:50,但是在测试环境,测试人员测出了一个带毫秒的日期:比如2024-06-25 10:49:50.9999999 这种情况下会出现查询bug SELEC…...
【日常记录】【JS】优雅检测用户是否在指定元素的外部点击
文章目录 1、界面基本布局2、代码实现3、参考链接 1、界面基本布局 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0">…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
java 实现excel文件转pdf | 无水印 | 无限制
文章目录 目录 文章目录 前言 1.项目远程仓库配置 2.pom文件引入相关依赖 3.代码破解 二、Excel转PDF 1.代码实现 2.Aspose.License.xml 授权文件 总结 前言 java处理excel转pdf一直没找到什么好用的免费jar包工具,自己手写的难度,恐怕高级程序员花费一年的事件,也…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
基于TurtleBot3在Gazebo地图实现机器人远程控制
1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...
JVM虚拟机:内存结构、垃圾回收、性能优化
1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...
C#学习第29天:表达式树(Expression Trees)
目录 什么是表达式树? 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持: 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...
快刀集(1): 一刀斩断视频片头广告
一刀流:用一个简单脚本,秒杀视频片头广告,还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农,平时写代码之余看看电影、补补片,是再正常不过的事。 电影嘛,要沉浸,…...
Web后端基础(基础知识)
BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器,应用程序的逻辑和数据都存储在服务端。 优点:维护方便缺点:体验一般 CS架构:Client/Server,客户端/服务器架构模式。需要单独…...
