RabbitMQ实践——搭建单人聊天服务
大纲
- 创建Core交换器
- 用户登录
- 发起聊天邀请
- 接受邀请
- 聊天
- 实验过程
- 总结
- 代码工程
经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。
该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。
创建Core交换器
package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class Core {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;final String exchangeName = "Core";@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();createExchange(exchangeName);}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}
用户登录
用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。
private final ReentrantLock lock = new ReentrantLock();final private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();public Flux<String> Login(String username) {createExclusiveQueue(username);createBanding(exchangeName, username, username);return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(username, (Message message) -> {String msg = new String(message.getBody());System.out.println("Received message: " + msg);emitter.next(msg);});container.start();});}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}private void createBanding(String exchangeName, String queueName, String routingKey) {rabbitTemplate.execute(channel -> {channel.queueBind(queueName, exchangeName, routingKey);return null;});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}
Controller如下
package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/user")
public class UserController {@Autowiredprivate Core core;@PostMapping(value = "/login", produces = "text/event-stream")public Flux<String> login(@RequestParam String username) {return core.Login(username);}
}
发起聊天邀请
发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。
package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import lombok.Data;
import reactor.core.publisher.Flux;@Service
public class ChatRoom {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;@Dataprivate class ChatRoomInfo {private String exchange;private Map<String, String> usernameToQueuename;}private final Map<String, ChatRoomInfo> chatRooms = new java.util.HashMap<>();private final ReentrantLock lock = new ReentrantLock(); @PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> invite(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {createChatRoom(fromUsername, toUsername);}return talk(chatRoomName, fromUsername);}private void createChatRoom(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);String exchangeName = chatRoomName;String fromQueueName = "queue-" + fromUsername + "-" + toUsername;String toQueueName = "queue-" + toUsername + "-" + fromUsername;rabbitTemplate.execute(action -> {action.exchangeDeclare(exchangeName, "fanout", false, true, null);action.queueDeclare(fromQueueName, false, true, false, null);action.queueDeclare(toQueueName, false, true, false, null);action.queueBind(fromQueueName, exchangeName, "");action.queueBind(toQueueName, exchangeName, "");return null;});lock.lock();try {ChatRoomInfo chatRoomInfo = new ChatRoomInfo();chatRoomInfo.setExchange(exchangeName);chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName));chatRooms.put(chatRoomName, chatRoomInfo);} finally {lock.unlock();}}
接受邀请
被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。
public Flux<String> accept(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);return talk(chatRoomName, toUsername);}private Flux<String> talk(String chatRoomName, String username) {ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}String queueName = chatRoomInfo.getUsernameToQueuename().get(username);return Flux.create(emitter -> {SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener((Message message) -> {String msg = new String(message.getBody());System.out.println(username + " received message: " + msg);emitter.next(msg);});listener.start();});}
聊天
聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。
public void chat(String fromUsername, String toUsername, String message) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {chatRoomName = getChatRoomName(toUsername, fromUsername);chatRoomInfo = chatRooms.get(chatRoomName);}if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message);}private String getChatRoomName(String fromUsername, String toUsername) {return fromUsername + "-" + toUsername + "-chat-room";}
Controller侧代码
package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.ChatRoom;
import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/chat")
public class ChatController {@Autowiredprivate Core core;@Autowiredprivate ChatRoom chatRoom;@PutMapping(value = "/invite", produces = "text/event-stream")public Flux<String> invite(@RequestParam String fromUsername, @RequestParam String toUsername) {core.invite(fromUsername, toUsername);return chatRoom.invite(fromUsername, toUsername);}@PutMapping(value = "/accept", produces = "text/event-stream")public Flux<String> accept(@RequestParam String fromUsername, @RequestParam String toUsername) {core.accept(fromUsername, toUsername);return chatRoom.accept(fromUsername, toUsername);}@PostMapping("/send")public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) {chatRoom.chat(fromUsername, toUsername, message);}
}
实验过程
在Postman中,我们先让tom登录,然后jerry登录。
在后台,我们看到创建两个队列
以及Core交换器的绑定关系也被更新
Jerry向Tom发起聊天邀请
可以看到Tom收到了邀请
同时新增了两个队列
以及一个交换器
Tom通过下面请求接受邀请
Jerry收到Tom接受了邀请的通知
后面它们就可以聊天了
它们的聊天窗口都收到了消息
总结
本文主要使用的知识点:
- direct交换器以及其绑定规则
- fanout交换器
- 自动删除的交换器
- 自动删除的队列
- 只有一个消费者的队列
- WebFlux响应式编程
代码工程
https://github.com/f304646673/RabbitMQDemo
相关文章:
RabbitMQ实践——搭建单人聊天服务
大纲 创建Core交换器用户登录发起聊天邀请接受邀请聊天实验过程总结代码工程 经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。 基本结构如下。为了避免Server有太多连线导致杂乱,下…...
GPT-5
欢迎来到 Papicatch的博客 文章目录 🍉技术突破预测 🍈算法进步 🍈理解力提升 🍈行业推动力 🍉人机协作的未来 🍈辅助决策 🍈增强创造力 🍈复杂任务中的角色 🍈人…...
Vip-智能预估+大数据标签+人群全选=用户分群!
Mobpush用户分群功能升级,创建推送入口vip用户可进入自有选择标签创建“用户分群”,相比于免费标签,“用户标签”维度更丰富。在应用基础属性上,增加“品牌”、“网络状态”、“运营商”,众所周知,不同厂商…...
SpringBoot异常处理机制之自定义404、500错误提示页面 - 518篇
历史文章(文章累计500) 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…...
为什么选择Xinstall CPA结算系统?因为它能帮您解决这些痛点!
在App推广和运营的道路上,我们时常面临着各种挑战和痛点。其中,结算系统的复杂性和不透明性往往成为制约我们发展的瓶颈。然而,有了Xinstall CPA结算系统,这些问题将迎刃而解,让您的App推广之路更加顺畅和高效。 一、…...
2024年【建筑电工(建筑特殊工种)】模拟试题及建筑电工(建筑特殊工种)作业考试题库
题库来源:安全生产模拟考试一点通公众号小程序 2024年建筑电工(建筑特殊工种)模拟试题为正在备考建筑电工(建筑特殊工种)操作证的学员准备的理论考试专题,每个月更新的建筑电工(建筑特殊工种)作业考试题库祝您顺利通过建筑电工(建筑特殊工种)考试。 1、…...
解锁数字化转型的双引擎:MSP和CMP的力量
随着企业数字化转型的深入,云计算已经成为现代企业IT基础设施的重要组成部分。为了高效地管理和优化多云环境,企业通常会依赖管理服务提供商 (Managed Service Providers, MSP) 和云管理平台 (Cloud Management Platforms, CMP)。本文将探讨MSP和CMP的定…...
Pyecharts入门
数据可视化 Pyecharts简介 Apache ECharts 是一个由百度开源的数据可视化,凭借着良好的交互性,精巧的图表设计,得到了众多开发者的认可。而 Python 是一门富有表达力的语言,很适合用于数据处理。当数据分析遇上数据可视化时&#…...
Socket编程详解(一)服务端与客户端的双向对话
目录 预备知识 视频教程 项目前准备知识点 1、服务器端程序的编写步骤 2、客户端程序编写步骤 代码部分 1、服务端FrmServer.cs文件 2、客户端FrmClient.cs文件 3、启动文件Program.cs 结果展示 预备知识 请查阅博客http://t.csdnimg.cn/jE4Tp 视频教程 链接&#…...
使用Python实现深度学习模型:强化学习与深度Q网络(DQN)
深度Q网络(Deep Q-Network,DQN)是结合深度学习与强化学习的一种方法,用于解决复杂的决策问题。本文将详细介绍如何使用Python实现DQN,主要包括以下几个方面: 强化学习简介DQN算法简介环境搭建DQN模型实现模型训练与评估1. 强化学习简介 强化学习是一种训练智能体(agent…...
Py-Spy、Scalene 和 VizTracer 的对比分析
在前几篇文章中,我们详细介绍了如何使用 py-spy、scalene 和 viztracer 进行性能分析和优化。今天,我们将对这三个性能分析工具进行详细对比,帮助你选择最适合你的工具。 工具简介 Py-Spy: 实时性能分析:Py-Spy 可以…...
软考架构师考试内容
软考系统架构设计师考试是中国计算机技术与软件专业技术资格(水平)考试(简称软考)中的一项高级资格考试,旨在评估考生是否具备系统架构设计的能力。根据提供的参考资料,考试内容主要包括以下几个方面&#…...
【MySQL基础篇】概述及SQL指令:DDL及DML
数据库是一个按照数据结构来组织、存储和管理数据的仓库。以下是对数据库概念的详细解释:定义与基本概念: 数据库是长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。 数据库不仅仅是数据的简单堆积,而是遵循一定的规则…...
计算机网络 —— 网络字节序
网络字节序 1、网络字节序 (Network Byte Order)和本机转换 1、大端、小端字节序 “大端” 和” 小端” 表示多字节值的哪一端存储在该值的起始地址处;小端存储在起始地址处,即是小端字节序;大端存储在起始地址处,即是大端字节…...
区块链不可能三角
区块链不可能三角:探索去中心化、安全与可扩展性的权衡 引言 区块链技术自诞生以来,以其去中心化、透明、安全等特点吸引了全球的关注,成为金融科技领域的重要革新力量。然而,随着区块链应用的日益广泛,一个核心问题…...
新手第一个漏洞复现:MS17-010(永恒之蓝)
文章目录 漏洞原理漏洞影响范围复现环境复现步骤 漏洞原理 漏洞出现在Windows SMB v1中的内核态函数srv!SrvOs2FeaListToNt在处理FEA(File Extended Attributes)转换时。该函数在将FEA list转换成NTFEA(Windows NT FEA)list前&am…...
代码随想录Day64
98.所有可达路径 题目:98. 所有可达路径 (kamacoder.com) 思路:果断放弃 答案 import java.util.*;public class Main {private static List<List<Integer>> adjList;private static List<List<Integer>> allPaths;private sta…...
Angular 指令
Angular 指令是 Angular 框架中的一项核心功能,它允许开发人员扩展 HTML 的功能,并创建可复用的组件和行为。以下是一些常见的 Angular 指令: 1. 组件指令 (Component Directives) 组件指令是最常用的一种指令,用于创建可复用的 U…...
移动端 UI 风格,书写华丽篇章
移动端 UI 风格,书写华丽篇章...
flutter开发实战-ListWheelScrollView与自定义TimePicker时间选择器
flutter开发实战-ListWheelScrollView与自定义TimePicker 最近在使用时间选择器的时候,需要自定义一个TimePicker效果,当然这里就使用了ListWheelScrollView。ListWheelScrollView与ListView类似,但ListWheelScrollView渲染效果类似滚筒效果…...
stable diffusion 模型和lora融合
炜哥的AI学习笔记——SuperMerger插件学习 - 哔哩哔哩接下来学习的插件名字叫做 SuperMerger,它的作用正如其名,可以融合大模型或者 LoRA,一般来说会结合之前的插件 LoRA Block Weight 使用,在调整完成 LoRA 模型的权重后使用改插件进行重新打包。除了 LoRA ,Checkpoint 也…...
Spring Boot中的分布式缓存方案
Spring Boot中的分布式缓存方案 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨在Spring Boot应用中实现分布式缓存的方案,以提升系统…...
AI写作革命:如何用AI工具轻松搞定700+学科的论文?
不知道大家有没有发现,随着人工智能技术的快速发展,AI工具正逐渐渗透到我们日常生活的各个方面,极大地提高了我们的工作和学习效率。无论是AI写作、AI绘画、AI思维导图,还是AI幻灯片制作,这些工具已成为我们不可或缺的…...
v-for中key的原理以及用法
在 Vue.js 中,v-for 指令用于基于源数据多次渲染元素或模板块。当使用 v-for 渲染列表时,为每个列表项提供一个唯一的 key 属性是非常重要的。key 的主要作用是帮助 Vue 跟踪每个节点的身份,从而重用和重新排序现有元素。 先来张原理图&#…...
基于强化学习的目标跟踪论文合集
文章目录 2020UAV Maneuvering Target Tracking in Uncertain Environments Based on Deep Reinforcement Learning and Meta-LearningUAV Target Tracking in Urban Environments Using Deep Reinforcement Learning 2021Research on Vehicle Dispatch Problem Based on Kuhn-…...
高质量AIGC/ChatGPT/大模型资料分享
2023年要说科技圈什么最火爆,一定是ChatGPT、AIGC(人工智能生成内容)和大型语言模型。这些技术前沿如同科技世界的新潮流,巨浪拍岸,引发各界关注。ChatGPT的互动性和逼真度让人们瞠目,它能与用户展开流畅对…...
使用Python进行Socket接口测试
大家好,在现代软件开发中,网络通信是不可或缺的一部分。无论是传输数据、获取信息还是实现实时通讯,都离不开可靠的网络连接和有效的数据交换机制。而在网络编程的基础中,Socket(套接字)技术扮演了重要角色…...
C++编程逻辑讲解step by step:存折和信用卡类。
题目 存折和信用卡类,信用卡是一种存折,可以透支,可以存款。 代码 #include<iostream> #include<string> using namespace std; class passbook {public: passbook(string nam,int n,float m) {namenam; numn; balancem; } vo…...
为什么说BIM在机电安装行业是刚需?3D开发工具HOOPS如何促进BIM发展?
在建筑行业中,机电安装是一个复杂且精细的工程领域,它涉及到电气、管道、通风和控制系统等多个方面。随着建筑项目规模的不断扩大和复杂性的增加,传统的二维设计方法已经难以满足现代建筑的需求。正是在这种背景下,BIM技术应运而生…...
SQLite:一个极简使用教程
SQLite是一个轻量级的、文件系统基础的数据库,它被设计为配置简单、易于部署。SQLite数据库存储在一个单一的磁盘文件中,这意味着数据库的创建和维护都非常简单。 1. SQLite特点 轻量级:SQLite不需要一个独立的服务器进程。它是一个嵌入式SQ…...
asp做新闻网站/网络营销专业是做什么的
1. YOLOX的改进 YOLOX 以YOLO v3作为baseline主要做了以下改进: ①. 输入端的图像增强(Mosaic、Mixup、RandomHorizontalFlip、ColorJitter、多尺度训练) ②. Backbone(Darknet53 SPP) ③. Neck (FPN PAN) ④. Head…...
贵阳网站建设包首页/百度爱采购客服电话
系列文章目录 第二章计算机网络网络应用之Socket编程应用-应用编程接口(API) Socket编程-应用编程接口(API)系列文章目录一、Socket编程-应用编程接口(API)1.网络程序设计接口2.应用编程接口(AP…...
饰品网站建设/网站关键词怎么添加
目录 搜索内容中包含learning或者Hadoop的文档 搜索标题中包含learning和Hadoop的bolg 搜索标题中包含java,Hadoop,spark,elasticsearch,4个关键字中至少三个的bolg 用bool组合多个搜索条件java,Hadoop,spark,elasticsearch,来搜索title 使用should如何搜索java,Hadoop,sp…...
商丘网站制作推广/嘉兴网站建设
一、解决什么问题1、html中img引入的图片地址没有被替换,找不到图片2、html公共部分复用问题,如头部、底部、浮动层等二、html中img引入图片问题解决1、在index.html插入img,引用图片2、npm run dev运行结果如下:因为图片地址没有…...
wordpress 图标不显示/市场营销方案
1.控制台:执行过 Hibernate: select count(*) as y0_ from V_TRANSDETAIL_INFO this_ 后报错:Caused by: java.sql.SQLSyntaxErrorException: ORA-01031: 权限不足分析:debug,跟进去确实是因为查询存储过程V_TRANSDETAIL_INFO…...
关键词优化怎样/丽水百度seo
转自:https://www.pinlue.com/article/2021/05/2918/4511619339951.html...