当前位置: 首页 > news >正文

RabbitMQ实践——搭建单人聊天服务

大纲

  • 创建Core交换器
  • 用户登录
  • 发起聊天邀请
  • 接受邀请
  • 聊天
  • 实验过程
  • 总结
  • 代码工程

经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。
在这里插入图片描述
该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。

创建Core交换器

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class Core {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;final String exchangeName = "Core";@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();createExchange(exchangeName);}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}

用户登录

用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。

    private final ReentrantLock lock = new ReentrantLock();final private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();public Flux<String> Login(String username) {createExclusiveQueue(username);createBanding(exchangeName, username, username);return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(username, (Message message) -> {String msg = new String(message.getBody());System.out.println("Received message: " + msg);emitter.next(msg);});container.start();});}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}private void createBanding(String exchangeName, String queueName, String routingKey) {rabbitTemplate.execute(channel -> {channel.queueBind(queueName, exchangeName, routingKey);return null;});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

Controller如下

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/user")
public class UserController {@Autowiredprivate Core core;@PostMapping(value = "/login", produces = "text/event-stream")public Flux<String> login(@RequestParam String username) {return core.Login(username);}
}

发起聊天邀请

发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import lombok.Data;
import reactor.core.publisher.Flux;@Service
public class ChatRoom {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;@Dataprivate class ChatRoomInfo {private String exchange;private Map<String, String> usernameToQueuename;}private final Map<String, ChatRoomInfo> chatRooms = new java.util.HashMap<>();private final ReentrantLock lock = new ReentrantLock();   @PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> invite(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {createChatRoom(fromUsername, toUsername);}return talk(chatRoomName, fromUsername);}private void createChatRoom(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);String exchangeName = chatRoomName;String fromQueueName = "queue-" + fromUsername + "-" + toUsername;String toQueueName = "queue-" + toUsername + "-" + fromUsername;rabbitTemplate.execute(action -> {action.exchangeDeclare(exchangeName, "fanout", false, true, null);action.queueDeclare(fromQueueName, false, true, false, null);action.queueDeclare(toQueueName, false, true, false, null);action.queueBind(fromQueueName, exchangeName, "");action.queueBind(toQueueName, exchangeName, "");return null;});lock.lock();try {ChatRoomInfo chatRoomInfo = new ChatRoomInfo();chatRoomInfo.setExchange(exchangeName);chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName));chatRooms.put(chatRoomName, chatRoomInfo);} finally {lock.unlock();}}

接受邀请

被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。

    public Flux<String> accept(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);return talk(chatRoomName, toUsername);}private Flux<String> talk(String chatRoomName, String username) {ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}String queueName = chatRoomInfo.getUsernameToQueuename().get(username);return Flux.create(emitter -> {SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener((Message message) -> {String msg = new String(message.getBody());System.out.println(username + " received message: " + msg);emitter.next(msg);});listener.start();});}

聊天

聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。

    public void chat(String fromUsername, String toUsername, String message) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {chatRoomName = getChatRoomName(toUsername, fromUsername);chatRoomInfo = chatRooms.get(chatRoomName);}if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message);}private String getChatRoomName(String fromUsername, String toUsername) {return fromUsername + "-" + toUsername + "-chat-room";}

Controller侧代码

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.ChatRoom;
import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/chat")
public class ChatController {@Autowiredprivate Core core;@Autowiredprivate ChatRoom chatRoom;@PutMapping(value = "/invite", produces = "text/event-stream")public Flux<String> invite(@RequestParam String fromUsername, @RequestParam String toUsername) {core.invite(fromUsername, toUsername);return chatRoom.invite(fromUsername, toUsername);}@PutMapping(value = "/accept", produces = "text/event-stream")public Flux<String> accept(@RequestParam String fromUsername, @RequestParam String toUsername) {core.accept(fromUsername, toUsername);return chatRoom.accept(fromUsername, toUsername);}@PostMapping("/send")public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) {chatRoom.chat(fromUsername, toUsername, message);}
}

实验过程

在Postman中,我们先让tom登录,然后jerry登录。
在这里插入图片描述
在这里插入图片描述
在后台,我们看到创建两个队列
在这里插入图片描述
以及Core交换器的绑定关系也被更新
在这里插入图片描述
Jerry向Tom发起聊天邀请
在这里插入图片描述
可以看到Tom收到了邀请
在这里插入图片描述
同时新增了两个队列
在这里插入图片描述
以及一个交换器
在这里插入图片描述
在这里插入图片描述
Tom通过下面请求接受邀请
在这里插入图片描述
Jerry收到Tom接受了邀请的通知
在这里插入图片描述
后面它们就可以聊天了
在这里插入图片描述
在这里插入图片描述
它们的聊天窗口都收到了消息
在这里插入图片描述
在这里插入图片描述

总结

本文主要使用的知识点:

  • direct交换器以及其绑定规则
  • fanout交换器
  • 自动删除的交换器
  • 自动删除的队列
  • 只有一个消费者的队列
  • WebFlux响应式编程

代码工程

https://github.com/f304646673/RabbitMQDemo

相关文章:

RabbitMQ实践——搭建单人聊天服务

大纲 创建Core交换器用户登录发起聊天邀请接受邀请聊天实验过程总结代码工程 经过之前的若干节的学习&#xff0c;我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。 基本结构如下。为了避免Server有太多连线导致杂乱&#xff0c;下…...

GPT-5

欢迎来到 Papicatch的博客 文章目录 &#x1f349;技术突破预测 &#x1f348;算法进步 &#x1f348;理解力提升 &#x1f348;行业推动力 &#x1f349;人机协作的未来 &#x1f348;辅助决策 &#x1f348;增强创造力 &#x1f348;复杂任务中的角色 &#x1f348;人…...

Vip-智能预估+大数据标签+人群全选=用户分群!

Mobpush用户分群功能升级&#xff0c;创建推送入口vip用户可进入自有选择标签创建“用户分群”&#xff0c;相比于免费标签&#xff0c;“用户标签”维度更丰富。在应用基础属性上&#xff0c;增加“品牌”、“网络状态”、“运营商”&#xff0c;众所周知&#xff0c;不同厂商…...

SpringBoot异常处理机制之自定义404、500错误提示页面 - 518篇

历史文章&#xff08;文章累计500&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…...

为什么选择Xinstall CPA结算系统?因为它能帮您解决这些痛点!

在App推广和运营的道路上&#xff0c;我们时常面临着各种挑战和痛点。其中&#xff0c;结算系统的复杂性和不透明性往往成为制约我们发展的瓶颈。然而&#xff0c;有了Xinstall CPA结算系统&#xff0c;这些问题将迎刃而解&#xff0c;让您的App推广之路更加顺畅和高效。 一、…...

2024年【建筑电工(建筑特殊工种)】模拟试题及建筑电工(建筑特殊工种)作业考试题库

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2024年建筑电工(建筑特殊工种)模拟试题为正在备考建筑电工(建筑特殊工种)操作证的学员准备的理论考试专题&#xff0c;每个月更新的建筑电工(建筑特殊工种)作业考试题库祝您顺利通过建筑电工(建筑特殊工种)考试。 1、…...

解锁数字化转型的双引擎:MSP和CMP的力量

随着企业数字化转型的深入&#xff0c;云计算已经成为现代企业IT基础设施的重要组成部分。为了高效地管理和优化多云环境&#xff0c;企业通常会依赖管理服务提供商 (Managed Service Providers, MSP) 和云管理平台 (Cloud Management Platforms, CMP)。本文将探讨MSP和CMP的定…...

Pyecharts入门

数据可视化 Pyecharts简介 Apache ECharts 是一个由百度开源的数据可视化&#xff0c;凭借着良好的交互性&#xff0c;精巧的图表设计&#xff0c;得到了众多开发者的认可。而 Python 是一门富有表达力的语言&#xff0c;很适合用于数据处理。当数据分析遇上数据可视化时&#…...

Socket编程详解(一)服务端与客户端的双向对话

目录 预备知识 视频教程 项目前准备知识点 1、服务器端程序的编写步骤 2、客户端程序编写步骤 代码部分 1、服务端FrmServer.cs文件 2、客户端FrmClient.cs文件 3、启动文件Program.cs 结果展示 预备知识 请查阅博客http://t.csdnimg.cn/jE4Tp 视频教程 链接&#…...

使用Python实现深度学习模型:强化学习与深度Q网络(DQN)

深度Q网络(Deep Q-Network,DQN)是结合深度学习与强化学习的一种方法,用于解决复杂的决策问题。本文将详细介绍如何使用Python实现DQN,主要包括以下几个方面: 强化学习简介DQN算法简介环境搭建DQN模型实现模型训练与评估1. 强化学习简介 强化学习是一种训练智能体(agent…...

Py-Spy、Scalene 和 VizTracer 的对比分析

在前几篇文章中&#xff0c;我们详细介绍了如何使用 py-spy、scalene 和 viztracer 进行性能分析和优化。今天&#xff0c;我们将对这三个性能分析工具进行详细对比&#xff0c;帮助你选择最适合你的工具。 工具简介 Py-Spy&#xff1a; 实时性能分析&#xff1a;Py-Spy 可以…...

软考架构师考试内容

软考系统架构设计师考试是中国计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试&#xff08;简称软考&#xff09;中的一项高级资格考试&#xff0c;旨在评估考生是否具备系统架构设计的能力。根据提供的参考资料&#xff0c;考试内容主要包括以下几个方面&#…...

【MySQL基础篇】概述及SQL指令:DDL及DML

数据库是一个按照数据结构来组织、存储和管理数据的仓库。以下是对数据库概念的详细解释&#xff1a;定义与基本概念&#xff1a; 数据库是长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。 数据库不仅仅是数据的简单堆积&#xff0c;而是遵循一定的规则…...

计算机网络 —— 网络字节序

网络字节序 1、网络字节序 (Network Byte Order)和本机转换 1、大端、小端字节序 “大端” 和” 小端” 表示多字节值的哪一端存储在该值的起始地址处&#xff1b;小端存储在起始地址处&#xff0c;即是小端字节序&#xff1b;大端存储在起始地址处&#xff0c;即是大端字节…...

区块链不可能三角

区块链不可能三角&#xff1a;探索去中心化、安全与可扩展性的权衡 引言 区块链技术自诞生以来&#xff0c;以其去中心化、透明、安全等特点吸引了全球的关注&#xff0c;成为金融科技领域的重要革新力量。然而&#xff0c;随着区块链应用的日益广泛&#xff0c;一个核心问题…...

新手第一个漏洞复现:MS17-010(永恒之蓝)

文章目录 漏洞原理漏洞影响范围复现环境复现步骤 漏洞原理 漏洞出现在Windows SMB v1中的内核态函数srv!SrvOs2FeaListToNt在处理FEA&#xff08;File Extended Attributes&#xff09;转换时。该函数在将FEA list转换成NTFEA&#xff08;Windows NT FEA&#xff09;list前&am…...

代码随想录Day64

98.所有可达路径 题目&#xff1a;98. 所有可达路径 (kamacoder.com) 思路&#xff1a;果断放弃 答案 import java.util.*;public class Main {private static List<List<Integer>> adjList;private static List<List<Integer>> allPaths;private sta…...

Angular 指令

Angular 指令是 Angular 框架中的一项核心功能&#xff0c;它允许开发人员扩展 HTML 的功能&#xff0c;并创建可复用的组件和行为。以下是一些常见的 Angular 指令&#xff1a; 1. 组件指令 (Component Directives) 组件指令是最常用的一种指令&#xff0c;用于创建可复用的 U…...

移动端 UI 风格,书写华丽篇章

移动端 UI 风格&#xff0c;书写华丽篇章...

flutter开发实战-ListWheelScrollView与自定义TimePicker时间选择器

flutter开发实战-ListWheelScrollView与自定义TimePicker 最近在使用时间选择器的时候&#xff0c;需要自定义一个TimePicker效果&#xff0c;当然这里就使用了ListWheelScrollView。ListWheelScrollView与ListView类似&#xff0c;但ListWheelScrollView渲染效果类似滚筒效果…...

stable diffusion 模型和lora融合

炜哥的AI学习笔记——SuperMerger插件学习 - 哔哩哔哩接下来学习的插件名字叫做 SuperMerger,它的作用正如其名,可以融合大模型或者 LoRA,一般来说会结合之前的插件 LoRA Block Weight 使用,在调整完成 LoRA 模型的权重后使用改插件进行重新打包。除了 LoRA ,Checkpoint 也…...

Spring Boot中的分布式缓存方案

Spring Boot中的分布式缓存方案 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将探讨在Spring Boot应用中实现分布式缓存的方案&#xff0c;以提升系统…...

AI写作革命:如何用AI工具轻松搞定700+学科的论文?

不知道大家有没有发现&#xff0c;随着人工智能技术的快速发展&#xff0c;AI工具正逐渐渗透到我们日常生活的各个方面&#xff0c;极大地提高了我们的工作和学习效率。无论是AI写作、AI绘画、AI思维导图&#xff0c;还是AI幻灯片制作&#xff0c;这些工具已成为我们不可或缺的…...

v-for中key的原理以及用法

在 Vue.js 中&#xff0c;v-for 指令用于基于源数据多次渲染元素或模板块。当使用 v-for 渲染列表时&#xff0c;为每个列表项提供一个唯一的 key 属性是非常重要的。key 的主要作用是帮助 Vue 跟踪每个节点的身份&#xff0c;从而重用和重新排序现有元素。 先来张原理图&#…...

基于强化学习的目标跟踪论文合集

文章目录 2020UAV Maneuvering Target Tracking in Uncertain Environments Based on Deep Reinforcement Learning and Meta-LearningUAV Target Tracking in Urban Environments Using Deep Reinforcement Learning 2021Research on Vehicle Dispatch Problem Based on Kuhn-…...

高质量AIGC/ChatGPT/大模型资料分享

2023年要说科技圈什么最火爆&#xff0c;一定是ChatGPT、AIGC&#xff08;人工智能生成内容&#xff09;和大型语言模型。这些技术前沿如同科技世界的新潮流&#xff0c;巨浪拍岸&#xff0c;引发各界关注。ChatGPT的互动性和逼真度让人们瞠目&#xff0c;它能与用户展开流畅对…...

使用Python进行Socket接口测试

大家好&#xff0c;在现代软件开发中&#xff0c;网络通信是不可或缺的一部分。无论是传输数据、获取信息还是实现实时通讯&#xff0c;都离不开可靠的网络连接和有效的数据交换机制。而在网络编程的基础中&#xff0c;Socket&#xff08;套接字&#xff09;技术扮演了重要角色…...

C++编程逻辑讲解step by step:存折和信用卡类。

题目 存折和信用卡类&#xff0c;信用卡是一种存折&#xff0c;可以透支&#xff0c;可以存款。 代码 #include<iostream> #include<string> using namespace std; class passbook {public: passbook(string nam,int n,float m) {namenam; numn; balancem; } vo…...

为什么说BIM在机电安装行业是刚需?3D开发工具HOOPS如何促进BIM发展?

在建筑行业中&#xff0c;机电安装是一个复杂且精细的工程领域&#xff0c;它涉及到电气、管道、通风和控制系统等多个方面。随着建筑项目规模的不断扩大和复杂性的增加&#xff0c;传统的二维设计方法已经难以满足现代建筑的需求。正是在这种背景下&#xff0c;BIM技术应运而生…...

SQLite:一个极简使用教程

SQLite是一个轻量级的、文件系统基础的数据库&#xff0c;它被设计为配置简单、易于部署。SQLite数据库存储在一个单一的磁盘文件中&#xff0c;这意味着数据库的创建和维护都非常简单。 1. SQLite特点 轻量级&#xff1a;SQLite不需要一个独立的服务器进程。它是一个嵌入式SQ…...

asp做新闻网站/网络营销专业是做什么的

1. YOLOX的改进 YOLOX 以YOLO v3作为baseline主要做了以下改进&#xff1a; ①. 输入端的图像增强&#xff08;Mosaic、Mixup、RandomHorizontalFlip、ColorJitter、多尺度训练&#xff09; ②. Backbone&#xff08;Darknet53 SPP&#xff09; ③. Neck (FPN PAN) ④. Head…...

贵阳网站建设包首页/百度爱采购客服电话

系列文章目录 第二章计算机网络网络应用之Socket编程应用-应用编程接口&#xff08;API&#xff09; Socket编程-应用编程接口&#xff08;API&#xff09;系列文章目录一、Socket编程-应用编程接口&#xff08;API&#xff09;1.网络程序设计接口2.应用编程接口&#xff08;AP…...

饰品网站建设/网站关键词怎么添加

目录 搜索内容中包含learning或者Hadoop的文档 搜索标题中包含learning和Hadoop的bolg 搜索标题中包含java,Hadoop,spark,elasticsearch,4个关键字中至少三个的bolg 用bool组合多个搜索条件java,Hadoop,spark,elasticsearch,来搜索title 使用should如何搜索java,Hadoop,sp…...

商丘网站制作推广/嘉兴网站建设

一、解决什么问题1、html中img引入的图片地址没有被替换&#xff0c;找不到图片2、html公共部分复用问题&#xff0c;如头部、底部、浮动层等二、html中img引入图片问题解决1、在index.html插入img&#xff0c;引用图片2、npm run dev运行结果如下&#xff1a;因为图片地址没有…...

wordpress 图标不显示/市场营销方案

1.控制台&#xff1a;执行过 Hibernate: select count(*) as y0_ from V_TRANSDETAIL_INFO this_ 后报错&#xff1a;Caused by: java.sql.SQLSyntaxErrorException: ORA-01031: 权限不足分析&#xff1a;debug&#xff0c;跟进去确实是因为查询存储过程V_TRANSDETAIL_INFO…...

关键词优化怎样/丽水百度seo

转自&#xff1a;https://www.pinlue.com/article/2021/05/2918/4511619339951.html...