fastapi实现websocket在线聊天
最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题
1.问题难点
在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分布在不同的进程中,当进行通信时,可能无法找到已连接的连接对象。
2.解决方案
使用使用redis的订阅发布机制,使所有的进程都能进行消息订阅。这样能保证每个进程收到消息后都会进行相关的信息处理了。
3.方案设计
- 每个进程启动的时候都进行一个消息的订阅。
- 通过http请求,进行消息发布。
- 每个进程收到发布的消息后,进行判断是否由自己进行处理。
4.代码实现
①在服务启动时,进行消息订阅,并一直监听消息通道。当有消息发布时,进行消息处理。
# 初始化app
app = FastAPI(title="Ws Chat", description="测试", version="1.0.0")
app.openapi_version = "3.0.0"app.include_router(chat.app, prefix='/api/chat', tags=['Chat'])@app.on_event('startup')
async def on_startup():print(f"订阅初始化:{os.getpid()}")# 执行消息订阅机制https://aioredis.readthedocs.io/en/latest/examples/loop = asyncio.get_event_loop()loop.create_task(register_pubsub())async def reader(channel):# 进行消息的消费async for msg in channel.listen(): # 监听通道# print(msg)msg_data = msg.get("data")if msg_data and isinstance(msg_data, str):msg_data_dict = json.loads(msg_data)print(f"chat:{msg_data_dict}")sender = msg_data_dict.get("sender")# 进行消息处理await chat.cm.handle_websocket_message(msg_data_dict, sender)async def register_pubsub():pool = aioredis.from_url("redis://{}".format(host), db=db, password=password, port=port, encoding="utf-8", decode_responses=True)psub = pool.pubsub()async with psub as p:# 消息订阅await p.subscribe("chat")await reader(p)await p.unsubscribe("chat")
②websocket处理类
from fastapi import WebSocket, WebSocketDisconnectclass ConnectionManager:def __init__(self):# 保存当前所有的链接的websocket对象self.websocket_connections = {}async def connect(self, websocket: WebSocket, client_id):# 添加连接并发送欢迎消息await websocket.accept()self.websocket_connections[client_id] = websocketawait websocket.send_json({"type": "system","msg": "Welcome to the chat app!","sender": "system","recipient": client_id})try:# 处理消息while True:# 获取信息message = await websocket.receive_json()# 处理发送信息await self.handle_websocket_message(message, client_id)except WebSocketDisconnect:# 连接断开时移除连接del self.websocket_connections[client_id]async def handle_websocket_message(self, message: dict, client_id):# 处理私聊消息if message.get("type") == "private_message":recipient = message.get("recipient")msg = message.get("msg")recipient_conn = self.websocket_connections.get(recipient)if recipient_conn:# 在线await recipient_conn.send_json({"type": "private_message","sender": client_id,"msg": msg,"recipient": recipient})async def broadcast(self, message: dict):# 循环变量给所有在线激活的链接发送消息-全局广播for connection in self.websocket_connections:await connection.send_text(message)async def close(self, websocket: WebSocket, client_id):# 断开客户端的链接await websocket.close()del self.websocket_connections[client_id]async def disconnect(self, user_id):websocket: WebSocket = self.websocket_connections[user_id]await websocket.close()del self.websocket_connections[user_id]
③websocket连接
from app.chat_manager.server import ConnectionManagercm = ConnectionManager()@app.websocket("/connect_chat")
async def connect_chat(websocket: WebSocket, user_code: str):try:await cm.connect(websocket, user_code)except WebSocketDisconnect:# 连接断开时移除连接del cm.websocket_connections[user_code]
④http请求进行消息发布
@app.post("/create_chat", summary="发起聊天")
async def create_chat(param: DiagnosisChatSch, r=Depends(get_redis)):""""""ws_param = {"type": "private_message","msg": param.msg,"sender": param.sender,"recipient": param.recipient}# 进行消息发布await r.publish('diagnosis_chat', json.dumps(ws_param))return {'code': 200, 'msg': '成功', 'data': ''}
5.源码
github源码地址:https://github.com/zhangyukuo/fastapi_ws_chat
6.参考文章
https://www.cnblogs.com/a00ium/p/16931133.html
相关文章:
fastapi实现websocket在线聊天
最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题 1.问题难点 在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分…...
LLM推理部署(六):TogetherAI推出世界上LLM最快推理引擎,性能超过vLLM和TGI三倍
LLM能有多快?答案在于LLM推理的最新突破。 TogetherAI声称,他们在CUDA上构建了世界上最快的LLM推理引擎,该引擎运行在NVIDIA Tensor Core GPU上。Together推理引擎可以支持100多个开源大模型,比如Llama-2,并在Llama-2–…...
Unity | 渡鸦避难所-2 | 搭建场景并添加碰撞器
1 规范项目结构 上期中在导入一系列的商店资源包后,Assets 目录已经变的混乱不堪 开发过程中,随着资源不断更新,遵循一定的项目结构和设计规范是非常必要的。这可以增加项目的可读性、维护性、扩展性以及提高团队协作效率 这里先做下简单的…...
展望2024年供应链安全
2023年是开展供应链安全,尤其是开源治理如火如荼的一年,开源治理是供应链安全最重要的一个方面,所以我们从开源治理谈起。我们先回顾一下2023的开源治理情况。我们从信通院《2023年中国企业开源治理全景观察》发布的信息。信通院调研了来自七…...
React 列表页实现
一、介绍 列表页是常用的功能,从后端获取列表数据,刷新到页面上。开发列表页需要考虑以下技术要点:1.如何翻页;2.如何进行内容搜索;3.何时进行页面刷新。 二、使用教程 1.user-service 根据用户id获取用户列表,返回…...
【程序人生】还记得当初自己为什么选择计算机?
✏️ 初识计算机: 还记得人生中第一次接触计算机编程是在高中,第一门编程语言是Python(很可惜由于条件限制的原因,当时没能坚持学下去......现在想来有点后悔,没能坚持,唉......)。但是…...
9-tornado-Template优化方法、个人信息案例、tornado中ORM的使用(peewee的使用、peewee_async)、WTForms的使用
在很多情况下,前端模板中在很多页面有都重复的内容可以使用,比如页头、页尾、甚至中间的内容都有可能重复。这时,为了提高开发效率,我们就可以考虑在共同的部分提取出来, 主要方法有如下: 1. 模板继承 2. U…...
IDEA中.java .class .jar的含义与联系
当使用IntelliJ IDEA这样的集成开发环境进行Java编程时,通常涉及.java源代码文件、.class编译后的字节码文件以及.jar可执行的Java存档文件。 1. .java 文件: 1.这些文件包含了Java源代码,以文本形式编写。它们通常位于项目中的源代码目录中…...
北斗三号短报文森林消防应急通信及天通野外图传综合方案
森林火灾突发性强、破坏性大、危险性高,是全球发生最频繁、处置最困难、危害最严重的自然灾害之一,是生态文明建设成果和森林资源安全的最大威胁,甚至可能引发生态灾难和社会危机。我国总体上是一个缺林少绿、生态脆弱的国家,是一…...
js Array.every()的使用
2023.12.13今天我学习了如何使用Array.every()的使用,这个方法是用于检测数组中所有存在的元素。 比如我们需要判断这个数组里面的全部元素是否都包含张三,可以这样写: let demo [{id: 1, name: 张三}, {id: 2, name: 张三五}, {id: 3, name…...
前端编码中快速填充内容--乱数假文
写前端页面的时候,如果要快速插入图片,可以使用 https://picsum.photos/ 详见笔者这篇博文: 工具网站:随机生成图片的网站-CSDN博客 可是,如果要快速填充文字内容该怎么做呢? 以前,我们都是…...
数据结构二维数组计算题,以行为主?以列为主?
1.假设以行序为主序存储二维数组Aarray[1..100,1..100],设每个数据元素占2个存储单元,基地址为10,则LOC[5,5]( )。 A.808 B.818 C.1010 D&…...
springboot(ssm电影院订票信息管理系统 影院购票系统Java系统
springboot(ssm电影院订票信息管理系统 影院购票系统Java系统 开发语言:Java 框架:ssm/springboot vue JDK版本:JDK1.8(或11) 服务器:tomcat 数据库:mysql 5.7(或8.0࿰…...
AI 问答-供应链管理-相关概念:SCM、SRM、MDM、DMS、ERP、OBS、CRM、WMS...
一、供应链管理是什么 供应链管理:理解供应链管理_snowli的博客-CSDN博客 二、SCM 供应链管理 SCM全称为“Supply Chain Management”,即供应链管理。 SCM是企业管理范畴中一个非常重要的概念,指的是企业与供应商、生产商、分销商等各方之…...
初学vue3与ts:vue3选项式api获取当前路由地址
vue2的获取方法 this.$route.pathvue3选项式api获取方法 import { useRouter } from vue-router; const router useRouter(); console.log(router) console.log(router.currentRoute.value.path)...
2023最新大模型实验室解决方案
人工智能是引领未来的新兴战略性技术,是驱动新一轮科技革命和产业变革的重要力量。近年来,人工智能相关技术持续演进,产业化和商业化进程不断提速,正在加快与千行百业深度融合。 大模型实验室架构图 大模型实验室建设内容 一、课…...
leetcode707.设计链表
题目描述 你可以选择使用单链表或者双链表,设计并实现自己的链表。 单链表中的节点应该具备两个属性:val 和 next 。val 是当前节点的值,next 是指向下一个节点的指针/引用。 如果是双向链表,则还需要属性 prev 以指示链表中的…...
【K8s】Kubernetes CRD 介绍(控制器)
文章目录 CRD 概述1. 操作CRD1.1 创建 CRD1.2 操作 CRD 2. 其他笔记2.1 Kubectl 发现机制2.2 校验 CR2.3 简称和属性 3. 架构设计3.1 控制器概览 参考 CRD 概述 CR(Custom Resource)其实就是在 Kubernetes 中定义一个自己的资源类型,是一个具…...
Python 小程序之PDF文档加解密
PDF文档的加密和解密 文章目录 PDF文档的加密和解密前言一、总体构思二、使用到的库三、PDF文档的加密1.用户输入模块2.打开并读取文档数据3.遍历保存数据到新文档4.新文档进行加密5.新文档命名生成路径6.保存新加密的文档 四、PDF文档的解密1.用户输入模块2.前提准备2.文件解密…...
使用python脚本一个简单的搭建ansible集群
1.环境说明: 角色主机名ip地址控制主机server192.168.174.150受控主机/被管节点client1192.168.174.151受控主机/被管节点client2192.168.174.152 2.安装python和pip包 yum install -y epel-release yum install -y python python-pip 3.pip安装依赖库 pip in…...
关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...
BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践
6月5日,2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席,并作《智能体在安全领域的应用实践》主题演讲,分享了在智能体在安全领域的突破性实践。他指出,百度通过将安全能力…...
libfmt: 现代C++的格式化工具库介绍与酷炫功能
libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库,提供了高效、安全的文本格式化功能,是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全:…...
【UE5 C++】通过文件对话框获取选择文件的路径
目录 效果 步骤 源码 效果 步骤 1. 在“xxx.Build.cs”中添加需要使用的模块 ,这里主要使用“DesktopPlatform”模块 2. 添加后闭UE编辑器,右键点击 .uproject 文件,选择 "Generate Visual Studio project files",重…...
Unity VR/MR开发-VR开发与传统3D开发的差异
视频讲解链接:【XR马斯维】VR/MR开发与传统3D开发的差异【UnityVR/MR开发教程--入门】_哔哩哔哩_bilibili...
JS红宝书笔记 - 3.3 变量
要定义变量,可以使用var操作符,后跟变量名 ES实现变量初始化,因此可以同时定义变量并设置它的值 使用var操作符定义的变量会成为包含它的函数的局部变量。 在函数内定义变量时省略var操作符,可以创建一个全局变量 如果需要定义…...
python打卡第47天
昨天代码中注意力热图的部分顺移至今天 知识点回顾: 热力图 作业:对比不同卷积层热图可视化的结果 def visualize_attention_map(model, test_loader, device, class_names, num_samples3):"""可视化模型的注意力热力图,展示模…...
【汇编逆向系列】六、函数调用包含多个参数之多个整型-参数压栈顺序,rcx,rdx,r8,r9寄存器
从本章节开始,进入到函数有多个参数的情况,前面几个章节中介绍了整型和浮点型使用了不同的寄存器在进行函数传参,ECX是整型的第一个参数的寄存器,那么多个参数的情况下函数如何传参,下面展开介绍参数为整型时候的几种情…...
前端打包工具简单介绍
前端打包工具简单介绍 一、Webpack 架构与插件机制 1. Webpack 架构核心组成 Entry(入口) 指定应用的起点文件,比如 src/index.js。 Module(模块) Webpack 把项目当作模块图,模块可以是 JS、CSS、图片等…...
Ansible+Zabbix-agent2快速实现对多主机监控
ansible Ansible 是一款开源的自动化工具,用于配置管理(Configuration Management)、应用部署(Application Deployment)、任务自动化(Task Automation)和编排(Orchestration…...
