消息推送只会用websocket、轮询?试试SSE,轻松高效。
SSE介绍
HTTP Server-Sent Events (SSE) 是一种基于 HTTP 的服务器推送技术,它允许服务器向客户端推送数据,而无需客户端发起请求。以下是 HTTP SSE 的主要特点:
单向通信:
SSE 是一种单向通信协议,服务器可以主动向客户端推送数据,而客户端只能被动接收数据。
持久连接:
SSE 使用 HTTP 持久连接(long-lived connection)来保持客户端与服务器之间的连接,避免频繁地重新建立连接。
事件驱动:
SSE 采用事件驱动的方式,服务器将数据封装成事件推送给客户端,客户端可以根据事件类型进行相应的处理。
简单易用:
SSE 的协议简单,基于标准的 HTTP 协议,可以在任何支持 HTTP 的环境中使用。
客户端和服务器端的实现也相对简单,开发成本较低。
可靠性:
SSE 基于 HTTP 协议,可以利用 HTTP 的重试机制来提高数据传输的可靠性。
如果连接断开,客户端可以自动重新连接并恢复数据传输。
浏览器支持:
主流浏览器(Chrome、Firefox、Safari 等)都原生支持 SSE。
对于不支持 SSE 的浏览器,可以使用 polyfill 库来实现兼容性。
应用场景:
SSE 适用于实时性要求较高的场景,如聊天应用、体育赛事直播、股票行情更新等。
与 WebSocket 相比,SSE 更加轻量级,适用于一些对实时性要求不太高但需要持续更新的场景。
总的来说,HTTP SSE 提供了一种简单、可靠、高效的服务器推送机制,可以在各种 Web 应用中得到广泛应用。它是 Web 实时通信技术的一种重要补充。
与websocket对比
HTTP Server-Sent Events (SSE) 和 WebSocket 都是实现服务器与客户端之间实时双向通信的技术,但它们在某些方面存在一些差异。以下是它们的对比:
- 通信模式:
- SSE 是单向通信,服务器只能主动推送数据给客户端,客户端只能被动接收。
- WebSocket 是双向通信,服务器和客户端可以互相发送和接收数据。
- 连接方式:
- SSE 使用标准的 HTTP 连接,利用 HTTP 持久连接来保持连接。
- WebSocket 使用独立的 WebSocket 协议,建立全双工的 TCP 连接。
- 传输协议:
- SSE 使用标准的 HTTP 协议,数据以文本的形式传输。
- WebSocket 使用自己的二进制协议,可以传输二进制数据。
- 浏览器支持:
- SSE 被大多数现代浏览器原生支持。
- WebSocket 也被大多数现代浏览器原生支持。
- 可靠性:
- SSE 可以利用 HTTP 的重试机制来提高数据传输的可靠性。
- WebSocket 建立在 TCP 协议之上,也具有较高的可靠性。
- 实时性:
- SSE 的实时性略低于 WebSocket,因为它需要依赖 HTTP 的连接机制。
- WebSocket 建立在独立的 TCP 连接之上,实时性更高。
- 应用场景:
- SSE 更适合于一些实时性要求不太高但需要持续更新的场景,如聊天应用、体育赛事直播等。
- WebSocket 更适合于需要实时双向通信的场景,如在线游戏、视频会议等。
总的来说,SSE 和 WebSocket 都是实现服务器与客户端实时通信的有效方式,它们各有优缺点,适用于不同的应用场景。在选择时需要根据具体的需求来权衡取舍。
上代码
主体工具类 SseUtil
import com.alibaba.fastjson.JSON;
import com.enums.EnumDeviceType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.Resource;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;/*** SSE 通信工具类** @author Supreme_Sir* @version V1.0.0*/
@Component
@Slf4j
public class SseUtil {/*** SSE 超时时间 24小时*/private static final Long TIMEOUT_24_HOUR = 86400000L;@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;@Resourceprivate UnreadMessageCountCacheUtil unreadMessageCountCacheUtil;/*** 订阅SSE*/public SseEmitter subscribe(EnumDeviceType deviceType, Long userId) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter == null) {//生成连接并存储sseEmitter = new SseEmitter(TIMEOUT_24_HOUR);SingletonConcurrentHashMap.INSTANCE.put(deviceType, userId, sseEmitter);}//设置回调函数sseEmitter.onCompletion(completionCallBack(deviceType, userId));sseEmitter.onTimeout(timeoutCallBack(deviceType, userId));sseEmitter.onError(errorCallBack(deviceType, userId));// 立即发送未读消息数量,消除前端等待Long cnt = unreadMessageCountCacheUtil.getWithCallBack(userId);sendMessage(userId, new SseMessageVo(cnt, null));log.info("用户-{}-{} SSE连接成功", userId, deviceType.getName());return sseEmitter;}/*** 退订消息** @param userId 用户ID*/public String unsubscribe(EnumDeviceType deviceType, Long userId) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter != null) {//注意:此方法应由应用程序调用,以完成请求处理。它不应在容器相关事件(如发送时出错)发生后使用。sseEmitter.complete();SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);}return "退订成功";}/*** 发送SSE消息** @param userId 用户ID* @param content 消息内容*/public void sendMessage(Long userId, SseMessageVo content) {for (EnumDeviceType deviceType : EnumDeviceType.values()) {SseEmitter sseEmitter = SingletonConcurrentHashMap.INSTANCE.get(deviceType, userId);if (sseEmitter != null) {try {log.info("向用户-{} SSE发送消息-{}", userId, JSON.toJSONString(content));sseEmitter.send(content);} catch (IOException e) {log.error("用户-{}-{} SSE发送消息异常-{}", userId, deviceType.getName(), e.getMessage());SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);log.error("用户-{}-{} SSE发送消息异常被移除", userId, deviceType.getName());}}}}/*** SSE 单向通信心跳检测(需配合定时任务)*/public void heartbeat() {SingletonConcurrentHashMap.INSTANCE.getMap().forEach((key, value) -> {Long userId = extractNumbers(key.toString());Long cnt = unreadMessageCountCacheUtil.getWithCallBack(userId);sendMessage(userId, new SseMessageVo(cnt, null));});}/*** SSE 连接成功回调** @param userId 用户ID*/private Runnable completionCallBack(EnumDeviceType deviceType, Long userId) {return threadPoolTaskExecutor.newThread(() -> {log.info("用户-{}-{} SSE连接断开", userId, deviceType.getName());SingletonConcurrentHashMap.INSTANCE.remove(deviceType, userId);});}/*** 出现超时,将当前用户缓存删除** @param userId 用户ID*/private Runnable timeoutCallBack(EnumDeviceType deviceType, Long userId) {return threadPoolTaskExecutor.newThread(() -> {log.error("用户-{}-{} SSE连接超时", userId, deviceType.getName());unsubscribe(deviceType, userId);log.error("用户-{}-{} SSE连接超时被移除", userId, deviceType.getName());});}/*** 出现异常,将当前用户缓存删除** @param userId 用户ID*/private Consumer<Throwable> errorCallBack(EnumDeviceType deviceType, Long userId) {return throwable -> {log.error("用户-{}-{} SSE连接异常", userId, deviceType.getName());unsubscribe(deviceType, userId);log.error("用户-{}-{} SSE连接异常被移除", userId, deviceType.getName());};}/*** 截取字符串中的数字** @param input 待截取的字符串*/private Long extractNumbers(String input) {Pattern pattern = Pattern.compile("[a-zA-Z](\\d+)");Matcher matcher = pattern.matcher(input);if (matcher.find()) {// 返回第一个匹配的数字序列return Long.valueOf(matcher.group(1));} else {// 如果没有找到匹配项,可以返回null或抛出异常return null;}}
}
要点:
- 新建好的
SSE
对象需要用容器存储起来,以服务于后续消息通信。 - 回调使用
ThreadPool
进行管理避免线程过多。 - 一个
SSE
对象只能与一端保持通信,如果存在多端的话,需要创建多个对象。
SSE对象单例存储容器 SingletonConcurrentHashMap
import com.enums.EnumDeviceType;
import lombok.Getter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;/*** 基于ConcurrentHashMap的单例版SSE存储容器*/
@Getter
public enum SingletonConcurrentHashMap {/*** 单例版存储容器*/INSTANCE;private final ConcurrentHashMap<Object, SseEmitter> map = new ConcurrentHashMap<>();/*** 存入对象*/public void put(EnumDeviceType deviceType, Object key, SseEmitter value) {map.put(deviceType.getCode() + key, value);}/*** 获取对象*/public SseEmitter get(EnumDeviceType deviceType, Object key) {return map.get(deviceType.getCode() + key);}/*** 判断缓存中是否存在当前用户的SSE实例** @param key 用户ID*/public boolean haveInstance(Object key) {// 分别查询PC、小程序的SSE实例for (EnumDeviceType deviceType : EnumDeviceType.values()) {if (map.get(deviceType.getCode() + key) != null) {return true;}}return false;}/*** 移除对象*/public void remove(EnumDeviceType deviceType, Object key) {map.remove(deviceType.getCode() + key);}/*** 判断是否存在*/public boolean containsKey(EnumDeviceType deviceType, Object key) {return map.containsKey(deviceType.getCode() + key);}/*** 获取对象数量*/public int size() {return map.size();}/*** 清空*/public void clear() {map.clear();}}
心跳数据缓存工具 UnreadMessageCountCacheUtil
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.core.redis.RedisTemplateUtils;
import com.enums.EnumYesOrNo;
import com.util.RedisKeyUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.TimeUnit;/*** @author Supreme_Sir* @description 未读消息条数缓存工具**/
@Component
@Slf4j
public class UnreadMessageCountCacheUtil {@Resourceprivate IDao dao;// 过期时间30分钟private static final Long TIMEOUT = 30L;/*** 添加缓存*/private void put(Long key, Object value) {if (Objects.isNull(key) || Objects.isNull(value)) {return;}RedisTemplateUtils.setCacheObject(RedisKeyUtils.getUnreadMessageCount() + key, value, TIMEOUT, TimeUnit.MINUTES);}/*** 获取缓存(缓存中如果没有则回数据库查询)*/public Long getWithCallBack(Long key) {if (Objects.isNull(key)) {return null;}Object cnt = RedisTemplateUtils.getCacheObject(RedisKeyUtils.getUnreadMessageCount() + key);if (Objects.isNull(cnt)) {cnt = queryCount(key);put(key, cnt);}return Long.valueOf(cnt.toString());}/*** 获取最新缓存** @return {@link Long} 最新未读数据条数*/public Long getWithRefresh(Long key) {if (Objects.isNull(key)) {return null;}Long cnt = queryCount(key);put(key, cnt);return cnt;}/*** 手动刷新缓存*/public void refresh(Long key) {if (Objects.isNull(key)) {return;}put(key, queryCount(key));}/*** 回库查询未读消息条数** @param userId 用户ID* @return {@link Long} 未读消息数量*/private Long queryCount(Long userId) {QueryWrapper<> wrapper = new QueryWrapper<>();// 连接数据库查询数据return dao.selectCount(wrapper);}
}
注意:该缓存工具对象由 Spring
容器管理,以确保单例。
Controller 层代码
@PostMapping(value = "sse/subscribe***", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe***t(@Valid @RequestBody Param param) {return sseUtil.subscribe(EnumDeviceType.PC, param.getId());
}
注意:@PostMapping 的 produces 属性必须为 MediaType.TEXT_EVENT_STREAM_VALUE
前端关键代码
import { fetchEventSource } from '@microsoft/fetch-event-source';
const ctrl = new AbortController();
fetchEventSource(`${env.VITE_API_URL_PREFIX}/xxx/sse/xxx`, {signal: ctrl.signal,method: 'POST',headers: {'Auth-Token': localStorage.getItem(TOKEN_NAME),},body: JSON.stringify({UserID: localStorage.getItem('userID'),}),openWhenHidden: true,onopen: async (event: any) => {console.log('sse open:', event);},onmessage: async (event: any) => {const data = JSON.parse(event.data);this.setMsgCount(data.UnreadMsgCount || 0);console.log('SSE 消息:', data);if (data.Data) {const NotifyInstance = await NotifyPlugin.info({class: 'global-notify-card-wrap',icon: false,duration: 10000,closeBtn: false,offset: [0, 53],content: (h) =>h(MessageBox, {Data: data.Data,onHide: () => {NotifyInstance.close();},}),} as any);}},
});
this.see = {close: () => ctrl.abort(),
};
-------------------------------------------风雨里做个大人,阳光下做个孩子。-------------------------------------------
相关文章:
消息推送只会用websocket、轮询?试试SSE,轻松高效。
SSE介绍 HTTP Server-Sent Events (SSE) 是一种基于 HTTP 的服务器推送技术,它允许服务器向客户端推送数据,而无需客户端发起请求。以下是 HTTP SSE 的主要特点: 单向通信: SSE 是一种单向通信协议,服务器可以主动向客户端推送数据,而客户端只能被动接收数据。 持久连接: SS…...

Spring-Retry 框架实战经典重试场景
Spring-Retry框架是Spring自带的功能,具备间隔重试、包含异常、排除异常、控制重试频率等特点,是项目开发中很实用的一种框架。 1、引入依赖 坑点:需要引入AOP,否则会抛异常。 xml <!-- Spring-Retry --> <dependency&…...
人工智能在医疗领域的应用与挑战
随着人工智能技术的不断发展,其在医疗领域的应用也越来越广泛。从辅助诊断到治疗决策,人工智能正在逐步改变着传统的医疗模式。然而,人工智能在医疗领域的应用也面临着诸多挑战,如数据隐私、伦理道德等问题。本文将探讨人工智能在…...

Windows下nmap命令及Zenmap工具的使用方法
一、Nmap简介 nmap是一个网络连接端扫描软件,用来扫描网上电脑开放的网络连接端。确定哪些服务运行在哪些连接端,并且推断计算机运行哪个操作系统(这是亦称 fingerprinting)。它是网络管理员必用的软件之一,以及用以评…...
深入了解-什么是CUDA编程模型
CUDA(Compute Unified Device Architecture,统一计算架构)是NVIDIA推出的一种面向GPU的并行计算平台和编程模型。它允许开发者利用NVIDIA的GPU进行通用目的的并行计算,从而加速应用程序的运行速度。CUDA编程模型为开发者提供了强大…...
111111111111111111
11111111111111111111...

环境如何搭建部署Nacos
这里我使用的是Centos7, Nacos 依赖 Java环境来运行。如果您是从代码开始构建并运行Nacos,还需要为此配置 Maven环境,请确保是在以下版本环境中安装使用 ## 1、下载安装JDK wget https://download.oracle.com/java/17/latest/jdk-17_linux-x6…...

什么是 5G?
什么是 5G? 5G 是第五代无线蜂窝技术,与以前的网络相比,它提供了更高的上传和下载速度、更一致的连接以及更高的容量。5G 比目前流行的 4G 网络更快、更可靠,并有可能改变我们使用互联网访问应用程序、社交网络和信息的方式。例如…...

优化冗余代码:提升前端项目开发效率的实用方法
目录 前言代码复用与组件化模块化开发与代码分割工具辅助与自动化结束语 前言 在前端开发中,我们常常会遇到代码冗余的问题,这不仅增加了代码量,还影响了项目的可维护性和开发效率。还有就是有时候会接到紧急业务需求,要求立马完…...

SpringCloud Alibaba 微服务(四):Sentinel
目录 前言 一、什么是Sentinel? Sentinel 的主要特性 Sentinel 的开源生态 二、Sentinel的核心功能 三、Sentinel 的主要优势与特性 1、丰富的流控规则 2、完善的熔断降级机制 3、实时监控和控制台 4、多数据源支持 5、扩展性强 四、Sentinel 与 Hystrix …...
Python 3.12新功能(1)
Python 3.12正式发布已经很久了,我才将主要电脑的Python版本从3.11升级到最新。最近刚好工作没有那么紧张了,就来领略下这个最新版本中的新特性。 改善了错误消息 Python作为一门编程语言,简单易学容易上手,童叟无欺,深…...

c++STL容器中vector的使用,模拟实现及迭代器使用注意事项和迭代器失效问题
目录 前言: 1.vector的介绍及使用 1.2 vector的使用 1.2 1 vector的定义 1.2 2 vector iterator(迭代器)的使用 1.2.3 vector 空间增长问题 1.2.4 vector 增删查改 1.2.5vector 迭代器失效问题。 2.vector模拟实现 2.1 std::vect…...
Android笔试面试题AI答之Activity常见考点
Activity的常见考点可以总结如下: 生命周期管理:理解Activity在不同情况下(如屏幕旋转、配置更改、用户操作等)的生命周期变化,包括但不限于onCreate、onStart、onResume、onPause、onStop和onDestroy等回调方法。 启…...

RK3568笔记四十九:W25Q64驱动开发(硬件SPI1)
若该文为原创文章,转载请注明原文出处。 一、SPI介绍 串行外设接口 (Serial Peripheral interface) 简称 SPI,是一种高速的,全双工,同步的通信总线,并 且在芯片的管脚上只占用四根线,节约了芯片的管脚。 …...

TypeScript 定义不同的类型(详细示例)
还是大剑师兰特:曾是美国某知名大学计算机专业研究生,现为航空航海领域高级前端工程师;CSDN知名博主,GIS领域优质创作者,深耕openlayers、leaflet、mapbox、cesium,canvas,webgl,ech…...

[工具推荐]前端加解密之Burp插件Galaxy
如果觉得该文章有帮助的,麻烦师傅们可以搜索下微信公众号:良月安全。点个关注,感谢师傅们的支持。 免责声明 本号所发布的所有内容,包括但不限于信息、工具、项目以及文章,均旨在提供学习与研究之用。所有工具安全性…...

课题项目结题测试的作用
课题项目结题测试是课题项目研究过程中的一个重要环节,它对于确保课题项目的质量和成果具有重要的作用。本文将详细介绍课题项目结题测试的作用。 一、确保课题项目质量 课题项目结题测试是对课题项目研究成果的全面评估和检测。通过结题测试,可以对课…...

中国工商银行长春分行开展“工驿幸福 健康财富”长辈客群康养活动
中国工商银行长春分行作为国有大行,持续完善有温度、专业化、安全稳健的养老场景服务,以工行驿站为依托、以长辈客群养老需求为中心,积极对接社区构建敬老、康养的“金融泛金融”工行驿站服务生态,进一步提升长辈客群的到店体验。…...
机器学习 第十四章
目录 前言 一、隐马尔可夫模型 二、马尔可夫随机场 三、条件随机场 四、学习和推断 1.变量消去 2.信念传播 五、近似推断 1.MCMC采样 2.变分推断 六、话题模型 总结 前言 机器学习最重要的任务是根据一些已观察到的证据来对感兴趣的未知变量进行估计和推测。概率模…...

未来RPA财税的发展前景
近年来,全球数字化进程持续提速,越来越多企业受到效率及运营成本的压力,正努力寻求企业增长发展的新路径,而财务作为企业战略的“大脑”,成为企业数字化转型的重要突破口。RPA技术由于能够自动化各种重复性和繁琐的任务…...

网络六边形受到攻击
大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...
AspectJ 在 Android 中的完整使用指南
一、环境配置(Gradle 7.0 适配) 1. 项目级 build.gradle // 注意:沪江插件已停更,推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)
本期内容并不是很难,相信大家会学的很愉快,当然对于有后端基础的朋友来说,本期内容更加容易了解,当然没有基础的也别担心,本期内容会详细解释有关内容 本期用到的软件:yakit(因为经过之前好多期…...

七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
OD 算法题 B卷【正整数到Excel编号之间的转换】
文章目录 正整数到Excel编号之间的转换 正整数到Excel编号之间的转换 excel的列编号是这样的:a b c … z aa ab ac… az ba bb bc…yz za zb zc …zz aaa aab aac…; 分别代表以下的编号1 2 3 … 26 27 28 29… 52 53 54 55… 676 677 678 679 … 702 703 704 705;…...