消息推送只会用websocket、轮询?试试SSE,轻松高效。
SSE介绍
HTTP Server-Sent Events (SSE) 是一种基于 HTTP 的服务器推送技术,它允许服务器向客户端推送数据,而无需客户端发起请求。以下是 HTTP SSE 的主要特点:
单向通信:
SSE 是一种单向通信协议,服务器可以主动向客户端推送数据,而客户端只能被动接收数据。
持久连接:
SSE 使用 HTTP 持久连接(long-lived connection)来保持客户端与服务器之间的连接,避免频繁地重新建立连接。
事件驱动:
SSE 采用事件驱动的方式,服务器将数据封装成事件推送给客户端,客户端可以根据事件类型进行相应的处理。
简单易用:
SSE 的协议简单,基于标准的 HTTP 协议,可以在任何支持 HTTP 的环境中使用。
客户端和服务器端的实现也相对简单,开发成本较低。
可靠性:
SSE 基于 HTTP 协议,可以利用 HTTP 的重试机制来提高数据传输的可靠性。
如果连接断开,客户端可以自动重新连接并恢复数据传输。
浏览器支持:
主流浏览器(Chrome、Firefox、Safari 等)都原生支持 SSE。
对于不支持 SSE 的浏览器,可以使用 polyfill 库来实现兼容性。
应用场景:
SSE 适用于实时性要求较高的场景,如聊天应用、体育赛事直播、股票行情更新等。
与 WebSocket 相比,SSE 更加轻量级,适用于一些对实时性要求不太高但需要持续更新的场景。
总的来说,HTTP SSE 提供了一种简单、可靠、高效的服务器推送机制,可以在各种 Web 应用中得到广泛应用。它是 Web 实时通信技术的一种重要补充。
与websocket对比
HTTP Server-Sent Events (SSE) 和 WebSocket 都是实现服务器与客户端之间实时双向通信的技术,但它们在某些方面存在一些差异。以下是它们的对比:
- 通信模式:
- SSE 是单向通信,服务器只能主动推送数据给客户端,客户端只能被动接收。
- WebSocket 是双向通信,服务器和客户端可以互相发送和接收数据。
- 连接方式:
- SSE 使用标准的 HTTP 连接,利用 HTTP 持久连接来保持连接。
- WebSocket 使用独立的 WebSocket 协议,建立全双工的 TCP 连接。
- 传输协议:
- SSE 使用标准的 HTTP 协议,数据以文本的形式传输。
- WebSocket 使用自己的二进制协议,可以传输二进制数据。
- 浏览器支持:
- SSE 被大多数现代浏览器原生支持。
- WebSocket 也被大多数现代浏览器原生支持。
- 可靠性:
- SSE 可以利用 HTTP 的重试机制来提高数据传输的可靠性。
- WebSocket 建立在 TCP 协议之上,也具有较高的可靠性。
- 实时性:
- SSE 的实时性略低于 WebSocket,因为它需要依赖 HTTP 的连接机制。
- WebSocket 建立在独立的 TCP 连接之上,实时性更高。
- 应用场景:
- SSE 更适合于一些实时性要求不太高但需要持续更新的场景,如聊天应用、体育赛事直播等。
- WebSocket 更适合于需要实时双向通信的场景,如在线游戏、视频会议等。
总的来说,SSE 和 WebSocket 都是实现服务器与客户端实时通信的有效方式,它们各有优缺点,适用于不同的应用场景。在选择时需要根据具体的需求来权衡取舍。
上代码
主体工具类 SseUtil
import com.alibaba.fastjson.JSON;
import com.enums.EnumDeviceType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.Resource;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;/*** SSE 通信工具类** @author Supreme_Sir* @version V1.0.0*/
@Component
@Slf4j
public class SseUtil {/*** SSE 超时时间 24小时*/private static final Long TIMEOUT_24_HOUR = 86400000L;@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;@Resourceprivate UnreadMessageCountCacheUtil unreadMessageCountCacheUtil;/*** 订阅SSE*/public SseEmitter subscribe(EnumDeviceType deviceType, Long userId) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter == null) {//生成连接并存储sseEmitter = new SseEmitter(TIMEOUT_24_HOUR);SingletonConcurrentHashMap.INSTANCE.put(deviceType, userId, sseEmitter);}//设置回调函数sseEmitter.onCompletion(completionCallBack(deviceType, userId));sseEmitter.onTimeout(timeoutCallBack(deviceType, userId));sseEmitter.onError(errorCallBack(deviceType, userId));// 立即发送未读消息数量,消除前端等待Long cnt = unreadMessageCountCacheUtil.getWithCallBack(userId);sendMessage(userId, new SseMessageVo(cnt, null));log.info("用户-{}-{} SSE连接成功", userId, deviceType.getName());return sseEmitter;}/*** 退订消息** @param userId 用户ID*/public String unsubscribe(EnumDeviceType deviceType, Long userId) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter != null) {//注意:此方法应由应用程序调用,以完成请求处理。它不应在容器相关事件(如发送时出错)发生后使用。sseEmitter.complete();SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);}return "退订成功";}/*** 发送SSE消息** @param userId 用户ID* @param content 消息内容*/public void sendMessage(Long userId, SseMessageVo content) {for (EnumDeviceType deviceType : EnumDeviceType.values()) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter != null) {try {log.info("向用户-{} SSE发送消息-{}", userId, JSON.toJSONString(content));sseEmitter.send(content);} catch (IOException e) {log.error("用户-{}-{} SSE发送消息异常-{}", userId, deviceType.getName(), e.getMessage());SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);log.error("用户-{}-{} SSE发送消息异常被移除", userId, deviceType.getName());}}}}/*** SSE 单向通信心跳检测(需配合定时任务)*/public void heartbeat() {SingletonConcurrentHashMap.INSTANCE.getMap().forEach((key, value) -> {Long userId = extractNumbers(key.toString());Long cnt = unreadMessageCountCacheUtil.getWithCallBack(userId);sendMessage(userId, new SseMessageVo(cnt, null));});}/*** SSE 连接成功回调** @param userId 用户ID*/private Runnable completionCallBack(EnumDeviceType deviceType, Long userId) {return threadPoolTaskExecutor.newThread(() -> {log.info("用户-{}-{} SSE连接断开", userId, deviceType.getName());SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);});}/*** 出现超时,将当前用户缓存删除** @param userId 用户ID*/private Runnable timeoutCallBack(EnumDeviceType deviceType, Long userId) {return threadPoolTaskExecutor.newThread(() -> {log.error("用户-{}-{} SSE连接超时", userId, deviceType.getName());unsubscribe(deviceType, userId);log.error("用户-{}-{} SSE连接超时被移除", userId, deviceType.getName());});}/*** 出现异常,将当前用户缓存删除** @param userId 用户ID*/private Consumer<Throwable> errorCallBack(EnumDeviceType deviceType, Long userId) {return throwable -> {log.error("用户-{}-{} SSE连接异常", userId, deviceType.getName());unsubscribe(deviceType, userId);log.error("用户-{}-{} SSE连接异常被移除", userId, deviceType.getName());};}/*** 截取字符串中的数字** @param input 待截取的字符串*/private Long extractNumbers(String input) {Pattern pattern = Pattern.compile("[a-zA-Z](\\d+)");Matcher matcher = pattern.matcher(input);if (matcher.find()) {// 返回第一个匹配的数字序列return Long.valueOf(matcher.group(1));} else {// 如果没有找到匹配项,可以返回null或抛出异常return null;}}
}
要点:
- 新建好的
SSE对象需要用容器存储起来,以服务于后续消息通信。 - 回调使用
ThreadPool进行管理避免线程过多。 - 一个
SSE对象只能与一端保持通信,如果存在多端的话,需要创建多个对象。
SSE对象单例存储容器 SingletonConcurrentHashMap
import com.enums.EnumDeviceType;
import lombok.Getter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;/*** 基于ConcurrentHashMap的单例版SSE存储容器*/
@Getter
public enum SingletonConcurrentHashMap {/*** 单例版存储容器*/INSTANCE;private final ConcurrentHashMap<Object, SseEmitter> map = new ConcurrentHashMap<>();/*** 存入对象*/public void put(EnumDeviceType deviceType, Object key, SseEmitter value) {map.put(deviceType.getCode() + key, value);}/*** 获取对象*/public SseEmitter get(EnumDeviceType deviceType, Object key) {return map.get(deviceType.getCode() + key);}/*** 判断缓存中是否存在当前用户的SSE实例** @param key 用户ID*/public boolean haveInstance(Object key) {// 分别查询PC、小程序的SSE实例for (EnumDeviceType deviceType : EnumDeviceType.values()) {if (map.get(deviceType.getCode() + key) != null) {return true;}}return false;}/*** 移除对象*/public void remove(EnumDeviceType deviceType, Object key) {map.remove(deviceType.getCode() + key);}/*** 判断是否存在*/public boolean containsKey(EnumDeviceType deviceType, Object key) {return map.containsKey(deviceType.getCode() + key);}/*** 获取对象数量*/public int size() {return map.size();}/*** 清空*/public void clear() {map.clear();}}
心跳数据缓存工具 UnreadMessageCountCacheUtil
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.core.redis.RedisTemplateUtils;
import com.enums.EnumYesOrNo;
import com.util.RedisKeyUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.TimeUnit;/*** @author Supreme_Sir* @description 未读消息条数缓存工具**/
@Component
@Slf4j
public class UnreadMessageCountCacheUtil {@Resourceprivate IDao dao;// 过期时间30分钟private static final Long TIMEOUT = 30L;/*** 添加缓存*/private void put(Long key, Object value) {if (Objects.isNull(key) || Objects.isNull(value)) {return;}RedisTemplateUtils.setCacheObject(RedisKeyUtils.getUnreadMessageCount() + key, value, TIMEOUT, TimeUnit.MINUTES);}/*** 获取缓存(缓存中如果没有则回数据库查询)*/public Long getWithCallBack(Long key) {if (Objects.isNull(key)) {return null;}Object cnt = RedisTemplateUtils.getCacheObject(RedisKeyUtils.getUnreadMessageCount() + key);if (Objects.isNull(cnt)) {cnt = queryCount(key);put(key, cnt);}return Long.valueOf(cnt.toString());}/*** 获取最新缓存** @return {@link Long} 最新未读数据条数*/public Long getWithRefresh(Long key) {if (Objects.isNull(key)) {return null;}Long cnt = queryCount(key);put(key, cnt);return cnt;}/*** 手动刷新缓存*/public void refresh(Long key) {if (Objects.isNull(key)) {return;}put(key, queryCount(key));}/*** 回库查询未读消息条数** @param userId 用户ID* @return {@link Long} 未读消息数量*/private Long queryCount(Long userId) {QueryWrapper<> wrapper = new QueryWrapper<>();// 连接数据库查询数据return dao.selectCount(wrapper);}
}
注意:该缓存工具对象由 Spring 容器管理,以确保单例。
Controller 层代码
@PostMapping(value = "sse/subscribe***", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe***t(@Valid @RequestBody Param param) {return sseUtil.subscribe(EnumDeviceType.PC, param.getId());
}
注意:@PostMapping 的 produces 属性必须为 MediaType.TEXT_EVENT_STREAM_VALUE
前端关键代码
import { fetchEventSource } from '@microsoft/fetch-event-source';
const ctrl = new AbortController();
fetchEventSource(`${env.VITE_API_URL_PREFIX}/xxx/sse/xxx`, {signal: ctrl.signal,method: 'POST',headers: {'Auth-Token': localStorage.getItem(TOKEN_NAME),},body: JSON.stringify({UserID: localStorage.getItem('userID'),}),openWhenHidden: true,onopen: async (event: any) => {console.log('sse open:', event);},onmessage: async (event: any) => {const data = JSON.parse(event.data);this.setMsgCount(data.UnreadMsgCount || 0);console.log('SSE 消息:', data);if (data.Data) {const NotifyInstance = await NotifyPlugin.info({class: 'global-notify-card-wrap',icon: false,duration: 10000,closeBtn: false,offset: [0, 53],content: (h) =>h(MessageBox, {Data: data.Data,onHide: () => {NotifyInstance.close();},}),} as any);}},
});
this.see = {close: () => ctrl.abort(),
};
-------------------------------------------风雨里做个大人,阳光下做个孩子。-------------------------------------------
相关文章:
消息推送只会用websocket、轮询?试试SSE,轻松高效。
SSE介绍 HTTP Server-Sent Events (SSE) 是一种基于 HTTP 的服务器推送技术,它允许服务器向客户端推送数据,而无需客户端发起请求。以下是 HTTP SSE 的主要特点: 单向通信: SSE 是一种单向通信协议,服务器可以主动向客户端推送数据,而客户端只能被动接收数据。 持久连接: SS…...
Spring-Retry 框架实战经典重试场景
Spring-Retry框架是Spring自带的功能,具备间隔重试、包含异常、排除异常、控制重试频率等特点,是项目开发中很实用的一种框架。 1、引入依赖 坑点:需要引入AOP,否则会抛异常。 xml <!-- Spring-Retry --> <dependency&…...
人工智能在医疗领域的应用与挑战
随着人工智能技术的不断发展,其在医疗领域的应用也越来越广泛。从辅助诊断到治疗决策,人工智能正在逐步改变着传统的医疗模式。然而,人工智能在医疗领域的应用也面临着诸多挑战,如数据隐私、伦理道德等问题。本文将探讨人工智能在…...
Windows下nmap命令及Zenmap工具的使用方法
一、Nmap简介 nmap是一个网络连接端扫描软件,用来扫描网上电脑开放的网络连接端。确定哪些服务运行在哪些连接端,并且推断计算机运行哪个操作系统(这是亦称 fingerprinting)。它是网络管理员必用的软件之一,以及用以评…...
深入了解-什么是CUDA编程模型
CUDA(Compute Unified Device Architecture,统一计算架构)是NVIDIA推出的一种面向GPU的并行计算平台和编程模型。它允许开发者利用NVIDIA的GPU进行通用目的的并行计算,从而加速应用程序的运行速度。CUDA编程模型为开发者提供了强大…...
111111111111111111
11111111111111111111...
环境如何搭建部署Nacos
这里我使用的是Centos7, Nacos 依赖 Java环境来运行。如果您是从代码开始构建并运行Nacos,还需要为此配置 Maven环境,请确保是在以下版本环境中安装使用 ## 1、下载安装JDK wget https://download.oracle.com/java/17/latest/jdk-17_linux-x6…...
什么是 5G?
什么是 5G? 5G 是第五代无线蜂窝技术,与以前的网络相比,它提供了更高的上传和下载速度、更一致的连接以及更高的容量。5G 比目前流行的 4G 网络更快、更可靠,并有可能改变我们使用互联网访问应用程序、社交网络和信息的方式。例如…...
优化冗余代码:提升前端项目开发效率的实用方法
目录 前言代码复用与组件化模块化开发与代码分割工具辅助与自动化结束语 前言 在前端开发中,我们常常会遇到代码冗余的问题,这不仅增加了代码量,还影响了项目的可维护性和开发效率。还有就是有时候会接到紧急业务需求,要求立马完…...
SpringCloud Alibaba 微服务(四):Sentinel
目录 前言 一、什么是Sentinel? Sentinel 的主要特性 Sentinel 的开源生态 二、Sentinel的核心功能 三、Sentinel 的主要优势与特性 1、丰富的流控规则 2、完善的熔断降级机制 3、实时监控和控制台 4、多数据源支持 5、扩展性强 四、Sentinel 与 Hystrix …...
Python 3.12新功能(1)
Python 3.12正式发布已经很久了,我才将主要电脑的Python版本从3.11升级到最新。最近刚好工作没有那么紧张了,就来领略下这个最新版本中的新特性。 改善了错误消息 Python作为一门编程语言,简单易学容易上手,童叟无欺,深…...
c++STL容器中vector的使用,模拟实现及迭代器使用注意事项和迭代器失效问题
目录 前言: 1.vector的介绍及使用 1.2 vector的使用 1.2 1 vector的定义 1.2 2 vector iterator(迭代器)的使用 1.2.3 vector 空间增长问题 1.2.4 vector 增删查改 1.2.5vector 迭代器失效问题。 2.vector模拟实现 2.1 std::vect…...
Android笔试面试题AI答之Activity常见考点
Activity的常见考点可以总结如下: 生命周期管理:理解Activity在不同情况下(如屏幕旋转、配置更改、用户操作等)的生命周期变化,包括但不限于onCreate、onStart、onResume、onPause、onStop和onDestroy等回调方法。 启…...
RK3568笔记四十九:W25Q64驱动开发(硬件SPI1)
若该文为原创文章,转载请注明原文出处。 一、SPI介绍 串行外设接口 (Serial Peripheral interface) 简称 SPI,是一种高速的,全双工,同步的通信总线,并 且在芯片的管脚上只占用四根线,节约了芯片的管脚。 …...
TypeScript 定义不同的类型(详细示例)
还是大剑师兰特:曾是美国某知名大学计算机专业研究生,现为航空航海领域高级前端工程师;CSDN知名博主,GIS领域优质创作者,深耕openlayers、leaflet、mapbox、cesium,canvas,webgl,ech…...
[工具推荐]前端加解密之Burp插件Galaxy
如果觉得该文章有帮助的,麻烦师傅们可以搜索下微信公众号:良月安全。点个关注,感谢师傅们的支持。 免责声明 本号所发布的所有内容,包括但不限于信息、工具、项目以及文章,均旨在提供学习与研究之用。所有工具安全性…...
课题项目结题测试的作用
课题项目结题测试是课题项目研究过程中的一个重要环节,它对于确保课题项目的质量和成果具有重要的作用。本文将详细介绍课题项目结题测试的作用。 一、确保课题项目质量 课题项目结题测试是对课题项目研究成果的全面评估和检测。通过结题测试,可以对课…...
中国工商银行长春分行开展“工驿幸福 健康财富”长辈客群康养活动
中国工商银行长春分行作为国有大行,持续完善有温度、专业化、安全稳健的养老场景服务,以工行驿站为依托、以长辈客群养老需求为中心,积极对接社区构建敬老、康养的“金融泛金融”工行驿站服务生态,进一步提升长辈客群的到店体验。…...
机器学习 第十四章
目录 前言 一、隐马尔可夫模型 二、马尔可夫随机场 三、条件随机场 四、学习和推断 1.变量消去 2.信念传播 五、近似推断 1.MCMC采样 2.变分推断 六、话题模型 总结 前言 机器学习最重要的任务是根据一些已观察到的证据来对感兴趣的未知变量进行估计和推测。概率模…...
未来RPA财税的发展前景
近年来,全球数字化进程持续提速,越来越多企业受到效率及运营成本的压力,正努力寻求企业增长发展的新路径,而财务作为企业战略的“大脑”,成为企业数字化转型的重要突破口。RPA技术由于能够自动化各种重复性和繁琐的任务…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...
基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
R语言速释制剂QBD解决方案之三
本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...
tomcat入门
1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...
Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案
在大数据时代,海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构,在处理大规模数据抓取任务时展现出强大的能力。然而,随着业务规模的不断扩大和数据抓取需求的日益复杂,传统…...
