【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)
【通用消息通知服务】0x3 - 发送我们第一条消息
项目地址: A generic message notification system[Github]
实现接收/发送Websocket消息
Websocket Connection Pool
import asyncio
from asyncio.queues import Queue
from asyncio.queues import QueueEmpty
from contextlib import suppress
from typing import Anyimport async_timeout
import orjson
from sanic.log import logger
from ulid import ULIDfrom common.depend import DependencyPING = "#ping"
PONG = "#pong"class WebsocketConnectionPoolDependency(Dependency, dependency_name="WebsocketPool", dependency_alias="ws_pool"
):def __init__(self, app) -> None:super().__init__(app)self.lock = asyncio.Lock()self.connections = {} # 存储websocket connectionsself.send_queues = {} # 各websocket发送队列self.recv_queues = {} # 各websocket接收消息队列self.close_callbacks = {} # websocket销毁回调self.listeners = {} # 连接监听函数def _gen_id(self) -> str:return str(ULID())async def add_connection(self, connection) -> str:async with self.lock:id = self._gen_id()self.connections[id] = connectionself.send_queues[id] = Queue()self.app.add_task(self.send_task(self.send_queues[id], connection),name=f"websocket_{id}_send_task",)self.recv_queues[id] = Queue()self.app.add_task(self.recv_task(self.recv_queues[id], connection),name=f"websocket_{id}_recv_task",)self.app.add_task(self.notify_task(id), name=f"websocket_{id}_notify_task")self.app.add_task(self.is_alive_task(id), name=f"websocket_{id}_is_alive_task")setattr(connection, "_id", id)return connection._iddef get_connection(self, connection_id: str):return self.connections.get(connection_id)async def add_listener(self, connection_id, handler) -> str:async with self.lock:id = self._gen_id()self.listeners.setdefault(connection_id, {}).update({id: handler})return idasync def remove_listener(self, connection_id, listener_id):async with self.lock:self.listeners.get(connection_id, {}).pop(listener_id, None)async def add_close_callback(self, connection_id, callback):async with self.lock:self.close_callbacks.setdefault(connection_id, []).append(callback)def is_alive(self, connection_id: str):if hasattr(connection_id, "_id"):connection_id = connection_id._idreturn connection_id in self.connectionsasync def remove_connection(self, connection: Any):if hasattr(connection, "_id"):connection_id = connection._idelse:connection_id = connectionif connection_id not in self.connections:# removed alreadyreturnasync with self.lock:logger.info(f"remove connection: {connection_id}")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_send_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_recv_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_notify_task")with suppress(Exception):await self.app.cancel_task(f"websocket_{connection_id}_is_alive_task")if connection_id in self.send_queues:del self.send_queues[connection_id]if connection_id in self.recv_queues:del self.recv_queues[connection_id]if connection_id in self.listeners:del self.listeners[connection_id]if connection_id in self.close_callbacks:await self.do_close_callbacks(connection_id)del self.close_callbacks[connection_id]if connection_id in self.connections:del self.connections[connection_id]async def do_close_callbacks(self, connection_id):for cb in self.close_callbacks.get(connection_id, []):self.app.add_task(cb(connection_id))async def prepare(self):self.is_prepared = Truelogger.info("dependency:WebsocketPool is prepared")return self.is_preparedasync def check(self):return Trueasync def send_task(self, queue, connection):while self.is_alive(connection):try:data = queue.get_nowait()except QueueEmpty:await asyncio.sleep(0)continuetry:if isinstance(data, (bytes, str, int)):await connection.send(data)else:await connection.send(orjson.dumps(data).decode())queue.task_done()except Exception as err:breakasync def recv_task(self, queue, connection):while self.is_alive(connection):try:data = await connection.recv()await queue.put(data)logger.info(f"recv message: {data} from connection: {connection._id}")except Exception as err:breakasync def notify_task(self, connection_id):while self.is_alive(connection_id):try:logger.info(f"notify connection: {connection_id}'s listeners")data = await self.recv_queues[connection_id].get()for listener in self.listeners.get(connection_id, {}).values():await listener(connection_id, data)except Exception as err:passasync def is_alive_task(self, connection_id: str):if hasattr(connection_id, "_id"):connection_id = connection_id._idget_pong = asyncio.Event()async def wait_pong(connection_id, data):if data != PONG:returnget_pong.set()while True:get_pong.clear()await self.send(connection_id, PING)listener_id = await self.add_listener(connection_id, wait_pong)with suppress(asyncio.TimeoutError):async with async_timeout.timeout(self.app.config.WEBSOCKET_PING_TIMEOUT):await get_pong.wait()await self.remove_listener(connection_id, listener_id)if get_pong.is_set():# this connection is closedawait asyncio.sleep(self.app.config.WEBSOCKET_PING_INTERVAL)else:await self.remove_connection(connection_id)async def wait_closed(self, connection_id: str):"""if negative=True, only release when client close this connection."""while self.is_alive(connection_id):await asyncio.sleep(0)return Falseasync def send(self, connection_id: str, data: Any) -> bool:if not self.is_alive(connection_id):return Falseif connection_id not in self.send_queues:return Falseawait self.send_queues[connection_id].put(data)return True
Websocket Provider
from typing import Dict
from typing import List
from typing import Unionfrom pydantic import BaseModel
from pydantic import field_serializer
from sanic.log import loggerfrom apps.message.common.constants import MessageProviderType
from apps.message.common.constants import MessageStatus
from apps.message.common.interfaces import SendResult
from apps.message.providers.base import MessageProviderModel
from apps.message.validators.types import EndpointExID
from apps.message.validators.types import EndpointTag
from apps.message.validators.types import ETag
from apps.message.validators.types import ExID
from utils import get_appclass WebsocketMessageProviderModel(MessageProviderModel):class Info:name = "websocket"description = "Bio-Channel Communication"type = MessageProviderType.WEBSOCKETclass Capability:is_enabled = Truecan_send = Trueclass Message(BaseModel):connections: List[Union[EndpointTag, EndpointExID, str]]action: strpayload: Union[List, Dict, str, bytes]@field_serializer("connections")def serialize_connections(self, connections):return list(set(map(str, connections)))async def send(self, provider_id, message: Message) -> SendResult:app = get_app()websocket_pool = app.ctx.ws_poolsent_list = set()connections = []for connection in message.connections:if isinstance(connection, ETag):connections.extend([wfor c in await connection.decode()for w in c.get("websockets", [])])elif isinstance(connection, ExID):endpoint = await connection.decode()if endpoint:connections.extend(endpoint.get("websockets", []))else:connections.append(connection)connections = list(set(filter(lambda x: app.ctx.ws_pool.is_alive(connection), connections)))# logger.info(f"sending websocket message to {connections}")for connection in connections:if await websocket_pool.send(connection, data=message.model_dump_json(exclude=["connections"])):sent_list.add(connection)if sent_list:return SendResult(provider_id=provider_id, message=message, status=MessageStatus.SUCCEEDED)else:return SendResult(provider_id=provider_id, message=message, status=MessageStatus.FAILED)
websocket接口
@app.websocket("/websocket")
async def handle_websocket(request, ws):from apps.endpoint.listeners import register_websocket_endpointfrom apps.endpoint.listeners import unregister_websocket_endpointcon_id = Nonetry:ctx = request.app.ctxcon_id = await ctx.ws_pool.add_connection(ws)logger.info(f"new connection connected -> {con_id}")await ctx.ws_pool.add_listener(con_id, register_websocket_endpoint)await ctx.ws_pool.add_close_callback(con_id, unregister_websocket_endpoint)await ctx.ws_pool.send(con_id, data={"action": "on.connect", "payload": {"connection_id": con_id}})await ctx.ws_pool.wait_closed(con_id) # 等待连接断开finally:# 如果连接被客户端断开, handle_websocket将会被直接销毁, 所以销毁处理需要放在finally。request.app.add_task(request.app.ctx.ws_pool.remove_connection(con_id))
结果截图

相关文章:
【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)
【通用消息通知服务】0x3 - 发送我们第一条消息 项目地址: A generic message notification system[Github] 实现接收/发送Websocket消息 Websocket Connection Pool import asyncio from asyncio.queues import Queue from asyncio.queues import QueueEmpty from contextli…...
Eclipse打jar包与JavaDOC文档的生成
补充知识点——Eclipse打jar包与JavaDOC文档的生成 1、Eclipse如何打jar包,如何运行jar包 Java当中编写的Java代码,Java类、方法、接口这些东西就是项目中相关内容,到时候我们需要把代码提供给甲方、或者是我们需要运行我们编写的代码&…...
力扣:80. 删除有序数组中的重复项 II(Python3)
题目: 给你一个有序数组 nums ,请你 原地 删除重复出现的元素,使得出现次数超过两次的元素只出现两次 ,返回删除后数组的新长度。 不要使用额外的数组空间,你必须在 原地 修改输入数组 并在使用 O(1) 额外空间的条件下…...
linux:需要注意docker和aws的rds的mysql默认是UTC而不是中国时区
问题: 如题 解决办法: docker参考: mysql时间不对,修改时区_set global time_zone 无效_《小书生》的博客-CSDN博客 aws参考: https://www.youtube.com/watch?vB-NaqV-A1BY mysql - AWS修改RDS时区 - 个人文章 - Segm…...
访问 GitHub 方法
访问 GitHub 方法 方法一:最常见的就是 fq,但这个是违法的行为,自己私下搞可以,不能教你们。 方法二:利用加速器,这是正规合法操作。这里推荐一个免费的加速器,下载安装 Watt Toolkit加速器,原名…...
旅游APP外包开发注意事项
旅游类APP通常具有多种功能,以提供给用户更好的旅行体验。以下分享常见的旅游类APP功能以及在开发和使用这些APP时需要注意的问题,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 常见功能…...
ROS机器人编程---------(二)ROS中的核心概念
ROS机器人编程 ROS中的核心概念 ROS的通信机制 在ROS中结点是最小单元,比如说机器人的遥控器可以作为一个控制结点,机器人上的摄像头也可以看作一个结点,ROS通过协调各个结点来实现 在启动任何ROS结点之前,都必须先启动ROS Mas…...
Python学习教程:进程的调度
前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 要想多个进程交替运行,操作系统必须对这些进程进行调度, 这个调度也不是随即进行的,而是需要遵循一定的法则,由此就有了进程的调度算法。 python更多源码/资料/解答/教程等 …...
ElasticSearch第三讲:ES详解 - Elastic Stack生态和场景方案
ElasticSearch第三讲:ES详解 - Elastic Stack生态和场景方案 本文是ElasticSearch第三讲,在了解ElaticSearch之后,我们还要了解Elastic背后的生态 即我们常说的ELK;与此同时,还会给你展示ElasticSearch的案例场景&…...
基于Java+SpringBoot+Vue前后端分离农商对接系统设计和实现
博主介绍:✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专…...
【模方ModelFun】实景三维建模和修模4.0.7最新版安装包以及图文安装教程
模方ModelFun 具有多种功能,旨在帮助用户进行实景三维建模和修模。以下是一些主要功能的简要介绍: 实景三维建模:【模方ModelFun】提供了自动化的实景三维重建功能,可以从实景图像中提取几何形状和纹理信息,生成高质量…...
介绍几个搜索引擎
Google:全球最大的搜索引擎,提供全面的搜索服务,包括网页、图片、视频、新闻、地图等。 Baidu:中国最大的搜索引擎,提供类似于Google的全面搜索服务,同时也有网盘、知道等功能。 Bing:微软公司…...
iPhone 隔空投送使用指南:详细教程
本文介绍了如何在iPhone上使用隔空投送,包括如何在iOS 11到iOS 14的iPhone上启用它、发送文件以及接受或拒绝AirDrop发送给你的文件。对于iOS 7以上的旧款iPhone,提供了另一种方法。 如何打开隔空投送 你可以通过以下两种方式之一启动隔空投送功能:在“设置”应用程序或控…...
百度文心一言GPT免费入口也来了!!!
文心一言入口地址:文心一言能力全面开放 文心一言是百度全新一代知识增强大语言模型,文心大模型家族的新成员,能够与人对话互动,回答问题,协助创作,高效便捷地帮助人们获取信息、知识和灵感。 文心一言的技…...
线程调度和线程控制
在Java中,线程调度和线程控制是多线程编程中重要的概念,它们用于管理和控制线程的执行。以下是关于线程调度和线程控制的一些重要概念和技术: **1. 线程调度(Thread Scheduling): ** 线程调度是操作系统或Java虚拟机决定哪个线程在何时执行的过程。Java提供了多种线程调度…...
laravel excel导入导出
一、安装第三方 composer require maatwebsite/excel版本2.1和现在版本 有所不一样 二、导入 <?php namespace App\Import; use Maatwebsite\Excel\Concerns\ToCollection;class TestImport implements ToCollection {public function __construct(){}public function c…...
Windows无法删除分区怎么办?
我们知道Windows系统内置的磁盘管理工具是一个很实用的程序,可以帮助我们完成很多磁盘分区相关的基础操作,比如当我们想要删除硬盘上的某一个分区时,先想到的可能会是磁盘管理工具。但是当我们准备在磁盘管理工具中删除某个分区时,…...
【请求报错:javax.net.ssl.SSLHandshakeException: No appropriate protocol】
1、问题描述 在请求服务时报错说SSL握手异常协议禁用啥的,而且我的连接数据库的url也加了useSSLfalse javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate)2、解决方法 在网上查找了方法…...
elementUI textarea可自适应文本高度的文本域
效果图; 通过设置 autosize 属性可以使得文本域的高度能够根据文本内容自动进行调整,并且 autosize 还可以设定为一个对象,指定最小行数和最大行数。 <el-inputtype"textarea"autosizeplaceholder"请输入内容"v-model"te…...
WebRTC-Streamer交叉编译
WebRTC-Streamer交叉编译 flyfish 文章目录 WebRTC-Streamer交叉编译零、前言一、提前准备工作1 安装需要的工具2 可选的交叉编译工具3 默认执行python是python34 获取源码5 使用其他版本的方法 二、非交叉编译编译1 在 src目录执行 安装所需的依赖2 执行命令 三、 交叉编译1 …...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...
【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...
Angular微前端架构:Module Federation + ngx-build-plus (Webpack)
以下是一个完整的 Angular 微前端示例,其中使用的是 Module Federation 和 npx-build-plus 实现了主应用(Shell)与子应用(Remote)的集成。 🛠️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...
【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看
文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...
C语言中提供的第三方库之哈希表实现
一. 简介 前面一篇文章简单学习了C语言中第三方库(uthash库)提供对哈希表的操作,文章如下: C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...
HTML前端开发:JavaScript 获取元素方法详解
作为前端开发者,高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法,分为两大系列: 一、getElementBy... 系列 传统方法,直接通过 DOM 接口访问,返回动态集合(元素变化会实时更新)。…...
从物理机到云原生:全面解析计算虚拟化技术的演进与应用
前言:我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM(Java Virtual Machine)让"一次编写,到处运行"成为可能。这个软件层面的虚拟化让我着迷,但直到后来接触VMware和Doc…...
