当前位置: 首页 > news >正文

使用WebSocket实现log日志流的实时展示-从轮询到通知

场景介绍

最近开发一个系统,其中一个模块需要展示实时的执行过程,过程日志可能比较多。以前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单容易实现,但是比较消耗资源,后端没有数据的时候,会造成大量的无用轮询。所以这次我们采用长连接的方案,优化这块的逻辑,提升用户体验。
在这里插入图片描述

WebSocket介绍

参考:https://liaoxuefeng.com/books/java/spring/web/websocket/

WebSocket 是一种基于 HTTP 的长连接技术。传统的 HTTP 协议采用请求-响应模型,浏览器不发送请求时,服务器无法主动推送数据给浏览器。因此,当需要定期或不定期向浏览器推送数据(例如股票行情或在线聊天)时,传统的 HTTP 协议只能通过浏览器的 JavaScript 定时轮询来实现。这种方法效率低下,且实时性不高。

由于 HTTP 本身基于 TCP 连接,WebSocket 在 HTTP 协议的基础上进行了简单的升级。建立 TCP 连接后,浏览器在发送请求时附带以下头部信息:

GET /chat HTTP/1.1
Host: www.example.com
Upgrade: websocket
Connection: Upgrade

这表示客户端希望升级为长连接的 WebSocket。服务器返回升级成功的响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade

收到成功响应后,WebSocket 握手即告完成。这意味着代表 WebSocket 的 TCP 连接将不会被服务器关闭,而是保持开放状态,服务器和浏览器可以随时互相推送消息。这些消息既可以是文本,也可以是二进制数据。通常,大多数应用程序会发送基于 JSON 的文本消息。

现代浏览器均已支持 WebSocket 协议,服务器端则需要底层框架的支持。Java 的 Servlet 规范从 3.1 开始支持 WebSocket,因此,必须选择支持 Servlet 3.1 或更高版本的容器,才能使用 WebSocket。最新版本的 Tomcat、Jetty 等开源服务器均已支持 WebSocket。

在这里插入图片描述

实践演示

Java后端

我们以实际代码来演示如何在Springboot项目中实现对Websocket的支持。

step1: 添加websocket依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
step2: 增加配置

这个配置的主要作用是自动启动使用了注解==@ServerEndpoint==的类

@Configuration
@EnableWebSocket
public class WebSocketConfiguration {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
step3: 创建一个ws endpoint
@ServerEndpoint(value = ChaosConst.CHAOS_WS_API + "/execute/log/{bizType}/{bizId}")
@Component
@Slf4j
public class LogWsEndpoint implements Consumer<ChaosLogEvent> {// 对话的标识private String bizKey;// 存储每个会话private static final ConcurrentHashMap<String, List<LogWsEndpoint>> endpointMap = new ConcurrentHashMap<>();// 将会话放入到线程池中,异步将数据返回给前端private static ThreadPoolExecutor wsThreadPoolExecutor;// 核心逻辑处理器private ChaosLogEventHandler handler;// 业务写和读logprivate static ChaosLogger chaosLogger;@Autowired@Qualifier("wsThreadPoolExecutor")public void setWsThreadPoolExecutor(ThreadPoolExecutor wsThreadPoolExecutor) {if (null != wsThreadPoolExecutor) {LogWsEndpoint.wsThreadPoolExecutor = wsThreadPoolExecutor;}}@Autowiredpublic void setChaosLogger(ChaosLogger chaosLogger) {if (null != chaosLogger) {LogWsEndpoint.chaosLogger = chaosLogger;}}@OnOpenpublic void onOpen(Session session, @PathParam("bizType") String bizType, @PathParam("bizId") String bizId) {this.bizKey = String.format("%s-%s", bizType, bizId);log.info("[ws-chaos-log]连接建立中 ==> bizKey : {}", bizKey);this.handler = new ChaosLogEventHandler(chaosLogger, session);wsThreadPoolExecutor.submit(() -> flushMessage(bizType, bizId, true));endpointMap.compute(bizKey, (key, value) -> {List<LogWsEndpoint> ends = null == value ? new ArrayList<>() : value;ends.add(this);return ends;});log.info("[ws-chaos-log]连接建立成功: sessionId:{}, bizKey : {}",session.getId(), bizKey);}public void flushMessage(String bizType, String bizId, boolean force) {this.handler.flushMessage(bizType, bizId, force);}@OnClosepublic void onClose() {log.info("websocket log server close");if (StringUtils.isBlank(bizKey)) {return;}endpointMap.compute(bizKey, (key, endpoints) -> {if (null != endpoints) {endpoints.remove(this);}return endpoints;});log.info("[ws-chaos-log]连接关闭成功,关闭该连接信息:sessionId : {}, bizKey : {}。", handler.getSession().getId(), bizKey);}@OnMessagepublic void onMessage(String message, Session session) throws IOException {log.info("[ws-chaos-log]服务端收到客户端消息 ==> sessionId : {}, bizKey : {}, message : {}", handler.getSession().getId(), bizKey, message);}@OnErrorpublic void onError(Session session, Throwable error) {log.error("[ws-chaos-log]WebSocket发生错误,sessionId : {}, bizKey : {}", handler.getSession().getId(), bizKey);}@Overridepublic void accept(ChaosLogEvent chaosLogEvent) {String contextId = String.format("%s-%s", chaosLogEvent.getBizType(), chaosLogEvent.getBizId());log.info("accept chaosLogEvent : {}", JSON.toJSONString(chaosLogEvent));List<LogWsEndpoint> logWsEndpoints = endpointMap.get(contextId);if (CollectionUtils.isEmpty(logWsEndpoints)) {return;}logWsEndpoints.forEach(endpoint -> wsThreadPoolExecutor.submit(() -> endpoint.flushMessage(chaosLogEvent.getBizType(), chaosLogEvent.getBizId(), true)));}
}

==注意:上面有个accept()==方法,这个方法后面也会讲到,主要就是用于触发已经建立连接Websocket发送消息。

核心逻辑实现, 这里读取的日志文件是存储在百度云的ois,ois读取逻辑忽略。

@Slf4j
public class ChaosLogEventHandler {private static final long READ_LOG_MOST_LEN = 1024 * 1024 * 5L; // 5Mprivate final ChaosLogger chaosLogger;@Getterprivate final Session session;private final AtomicLong offset = new AtomicLong(-1L);private final AtomicBoolean hasTruncated = new AtomicBoolean(false);private final AtomicLong waitEventCnt = new AtomicLong(0L);private final Lock lock = new ReentrantLock();public ChaosLogEventHandler(ChaosLogger chaosLogger, Session session) {this.chaosLogger = chaosLogger;this.session = session;}public void flushMessage(String bizType, String bizId, boolean force) {String bizKey = bizType + "-" + bizId;if (!lock.tryLock()) {waitEventCnt.incrementAndGet();log.info("[WS]获取锁失败,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);return;}try {if (!force && waitEventCnt.getAndSet(0L) < 1) {log.info("[ws-chaos-log]没有待处理事件,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);// 没有待处理的事件return;}log.info("[ws-chaos-log]向客户端刷新数据 ==> sessionId : {}, bizKey : {}, offset : {}", session.getId(), bizKey, offset.get());if (offset.get() < 0) {long contentLength = chaosLogger.getLogContentLength(bizType, bizId);log.info("[ws-chaos-log]contentLength = {} for bizLogKey {}", contentLength, bizKey);if (contentLength == 0) {return;}if (contentLength > READ_LOG_MOST_LEN) {offset.set(contentLength - READ_LOG_MOST_LEN);hasTruncated.set(true);log.info("[ws-chaos-log]文件过大,截取最后10M ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());} else {offset.set(0L);}} else if (!force) {long contentLength = chaosLogger.getLogContentLength(bizType, bizId);if (contentLength <= offset.get()) {log.info("[ws-chaos-log]文件长度小于offset,无需刷新 ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());return;}}// 读取日志内容BosObject bosObject = chaosLogger.readLogObject(bizType, bizId, offset.get(), Long.MAX_VALUE);try (BufferedReader reader = new BufferedReader(new InputStreamReader(bosObject.getObjectContent()))) {String line = null;while (null != (line = reader.readLine())) {if (hasTruncated.get()) {hasTruncated.set(false);log.info("[ws-chaos-log]hasTruncated changed to false");} else {log.info("[ws-chaos-log]send ws msg:{}", line);try {session.getBasicRemote().sendText(line + "\n");} catch (IllegalStateException e) {log.info("[ws-chaos-log]发送消息过程中连接状态异常,跳过", e);}}// +1是因为每一行结尾会有一个回车offset.addAndGet(line.getBytes(StandardCharsets.UTF_8).length + 1);}} catch (IOException e) {log.error("", e);}} catch (NotFoundException e) {log.info("[ws-chaos-log]未找到数据,无需向客户端同步,bizKey:{}", bizKey, e);} catch (RuntimeException e) {log.error("", e);} finally {lock.unlock();}log.info("[ws-chaos-log]向客户端刷新数据,完成 ==> sessionId : {}, bizKey : {}", session.getId(), bizKey);// 在处理过程中,可能又有新的事件,所以再次尝试刷新数据flushMessage(bizType, bizKey, false);}
}
stept5: 广播事件,全局监听

前后端建立连接的时候,绑定了后端一台机器,但是后台一般都是多台服务器,如果事件传递到其他服务器,那么已经建立的连接如何监听到并返回内呢?

这里使用了rocketmq的机制,每台机器都会监听到事件的变化,从而触发当前机器将变更内容发回到前端。

@Component
@RocketMQMessageListener(topic = "EXECUTE_FLOW_LOG", selectorExpression = "log", consumerGroup = "flow-log", messageModel = MessageModel.BROADCASTING)
@Slf4j
public class ChaosLogEventConsumer implements RocketMQListener<String> {@Autowired(required = false)private List<Consumer<ChaosLogEvent>> chaosLogEventConsumers = Collections.emptyList();@Overridepublic void onMessage(String message) {log.info("[MQ]receive ChaosLogEvent message:{}", message);ChaosLogEvent event = JsonUtils.fromJson(message, ChaosLogEvent.class);for (Consumer<ChaosLogEvent> consumer : chaosLogEventConsumers) {try {consumer.accept(event);} catch (RuntimeException e) {log.error("[MQ] failed consume ChaosLogEvent message,consumer:" + consumer.getClass(), e);}}}
}

前端代码实现

以react为例,仅供参考:

export const fetchExecuteLogs = (bizType: string, bizId: any, logsRef: any, setLogs: any) => {if (!bizType || !bizId) {console.log('fetchLogs: logContextToken or node is null')return}setLogs([])if (logsRef.current[0]) {console.log('close ws')logsRef.current[0].close()}let host = wsHost ? wsHost : window.location.hostlet protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'let client = new WebSocket(`${protocol}://${host}/ws/ark/chaos/execute/log/${bizType}/${bizId}`)logsRef.current = [client, []]// 报错的回调函数client.onerror = (e: any) => {console.log('Connection Error')console.log(e)}//链接打开的回调函数client.onopen = () => {console.log('WebSocket Client Connected')}//链接关闭的回调函数client.onclose = () => {console.log('echo-protocol Client Closed')}//收到消息的处理函数client.onmessage = (e: any) => {if (logsRef.current[0] === client) {if (typeof e.data === 'string') {let newLogs = [...logsRef.current[1], e.data]if (newLogs.length > 250) {newLogs = newLogs.slice(200)}setLogs(newLogs)logsRef.current = [client, newLogs]}} else {client.close()}}const sendPing = () => {if (logsRef.current[0] === client) {const data = { message: 'heartbeat' }client.send(JSON.stringify(data))setTimeout(sendPing, 10000)}}setTimeout(sendPing, 10000)
}

相关文章:

使用WebSocket实现log日志流的实时展示-从轮询到通知

场景介绍 最近开发一个系统&#xff0c;其中一个模块需要展示实时的执行过程&#xff0c;过程日志可能比较多。以前的方案都是前端定时轮询&#xff0c;比如每秒查一次后端接口&#xff0c;将拉取回来的日志重新展示。轮询方案简单容易实现&#xff0c;但是比较消耗资源&#…...

UE5 从零开始制作跟随的大鹅

文章目录 二、绑定骨骼三、创建 ControlRig四、创建动画五、创建动画蓝图六、自动寻路七、生成 goose八、碰撞 和 Physics Asset缺点 # 一、下载模型 首先我们需要下载一个静态网格体&#xff0c;这里我们可以从 Sketchfab 中下载&#xff1a;Goose Low Poly - Download Free …...

O’Reilly

--江上往来人&#xff0c;但爱鲈鱼美。 --君看一叶舟&#xff0c;出没风波里。 OReilly OReilly出版社出版的技术类图书 俗称动物系列 应该是每个技术人员的必备手册。 OReilly动物系列&#xff08;中译本&#xff09; 简介" 动物系列作为 OReilly 书籍的典型代表被普遍…...

优盘驱动器未格式化:数据拯救行动指南

优盘困境&#xff1a;驱动器未格式化的挑战 在日常的数据存储与传输中&#xff0c;优盘以其便携性和高容量成为了我们不可或缺的伙伴。然而&#xff0c;当您尝试访问优盘时&#xff0c;突然弹出的“驱动器未被格式化”提示却如同晴天霹雳&#xff0c;让人措手不及。这一状况不…...

4.Handler mappings

处理程序映射 简介 在早期版本的 Spring 中&#xff0c;用户需要在 Web 应用程序上下文中定义一个或多个 HandlerMapping bean 以将传入的 Web 请求映射到适当的处理程序。随着注解控制器的引入&#xff0c;通常不再需要这样做&#xff0c;因为 RequestMappingHandlerMapping…...

《学会 SpringMVC 系列 · 消息转换器 MessageConverters》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…...

深度学习项目 -7-使用 Python 的手写数字识别

一、前言 该文章仅作为个人学习使用 二、正文 项目源代码&#xff1a;深度学习项目 - 使用 Python 进行手写数字识别 - DataFlair (data-flair.training) 数据集&#xff1a;​​​​​​​https://drive.google.com/open?id1hJiOlxctFH3uL2yTqXU_1f6c0zLr8V_K Python 深…...

MySQL —— 库,数据类型 与 表

库与基础操作 1.1 查看数据库 使用 show databases; 可以查看当前 MySQL 目前有多少个数据库 5 rows 表示有 5 行&#xff0c;这里是表示的是有效的数据&#xff0c;不包括 第一行的指引 set 表示结果集合 0.01 sec 表示这个 sql 语句一共运行了0.01 秒&#xff0c;一般情况…...

Java重修笔记 第二十七天 匿名内部类

匿名内部类 1. 定义&#xff1a;无类名&#xff08;底层自动分配类名“外部类名$1”&#xff09;&#xff0c;既是类也是对象&#xff0c;定义在外部类的局部位置&#xff0c;例如方法体和代码块中&#xff0c;通过new类或接口并在大括号里重写方法来实现。 2. 使用场景&…...

Nero Lens 智图 - 适用于 iOS 和 iPadOS 的专业图片处理 App

首先是手机端的无损放大 App&#xff1a;Nero Lens 智图&#xff0c;适用于 iOS 和 iPadOS&#xff0c;不仅可以放大&#xff0c;还有多种 AI 图片增强功能。 使用这款 App 可以通过 AI 模型智能放大可达 400%&#xff0c;还有老照片去划痕、上色&#xff0c;抠图移除背景、照…...

Nginx代理路径被吃

Nginx代理路径被吃的情况 日常工作中经常使用nginx反向代理一些资源&#xff0c;有时正常代理&#xff0c;发现代理不过去。 验证被吃调location情况 通过浏览器访问&#xff1a; https://zhao138969.com/LinuxPackage/Python/SelectDocker location /LinuxPackage { proxy…...

pytest-html报告修改与汉化

前言 Pytest框架可以使用两种测试报告&#xff0c;其中一种就是使用pytest-html插件生成的测试报告&#xff0c;但是报告中有一些信息没有什么用途或者显示的不太好看&#xff0c;还有一些我们想要在报告中展示的信息却没有&#xff0c;最近又有人问我pytest-html生成的报告&a…...

react-native从入门到实战系列教程一Swiper组件的使用及bug修复

轮播图&#xff0c;在app中随处可见&#xff0c;这么重要的功能我们怎么可能不学习下在react-native中的实现方式。 依然是第三方组件react-native-swiper 官网地址 https://www.npmjs.com/package/react-native-swiper 组件使用的组件及事件参考官方即可。 实现效果 官网…...

springboot开发的常用注解总结-配置组件类注解

Spring Boot 提供了许多注解&#xff0c;这些注解大大简化了 Spring 应用的配置和开发过程。以下是一些常见的 Spring Boot注解及其作用。 目录 配置组件类 &#xff08;Configure Component &#xff09;Configuration解释&#xff1a;Demo Code&#xff1a;更深度使用&#x…...

DataX 最新版本安装部署

1、下载 git clone gitgithub.com:alibaba/DataX.git 2、打包 mvn -U clean package assembly:assembly -Dmaven.test.skiptrue...

【架构】应用保护

这篇文章总结一下应用保护的手段。如今说到应用保护&#xff0c;更多的会想到阿里的sentinel&#xff0c;手段丰富&#xff0c;应用简单。sentinel的限流、降级、熔断&#xff0c;可以自己去试一下&#xff0c;sentinel主要通过配置实现功能&#xff0c;不难。sentinel的简介放…...

从核心到边界:六边形、洋葱与COLA架构的深度解析

文章目录 1 引言2 软件架构3 架构分类4 典型的应用架构4.1 分层架构4.2 CQRS4.3 六边形架构4.4 洋葱架构4.5 DDD 5 COLA架构设计5.1 分层设计5.2 扩展设计5.3 规范设计5.3.1 组件规范5.3.2 包规范5.3.3 命名规范 6 COLA架构总览7 小结 1 引言 软件的首要技术使命&#xff1a;管…...

04-Fastjson反序列化漏洞

免责声明 本文仅限于学习讨论与技术知识的分享&#xff0c;不得违反当地国家的法律法规。对于传播、利用文章中提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;本文作者不为此承担任何责任&#xff0c;一旦造成后果请自行承担&…...

ABC365(A-D)未补

A - Leap Year&#xff08;模拟&#xff09; 题意&#xff1a;给定一个数字n&#xff0c;如果n不是4的倍数&#xff0c;输出365&#xff1b;如果n是4的倍数但不是100的倍数&#xff0c;输出366&#xff1b;如果n是100的倍数但不是400的倍数&#xff0c;输出365&#xff1b;如果…...

Python用png生成不同尺寸的图标

Kimi生成 from PIL import Imagedef generate_icon(source_image_path, output_image_path, size):with Image.open(source_image_path) as img:# 转换图片为RGBA模式&#xff0c;确保有透明通道if img.mode ! RGBA:img img.convert(RGBA)# 调整图片大小到指定尺寸img img.r…...

如何添加超链接_a标签href属性详解【详解】

给 <a> 标签加跳转地址只需写 href 属性&#xff0c;它是唯一必需属性&#xff1b;缺它则仅为普通文本&#xff0c;不可点击且不被识别为链接。怎么给 <a> 标签加跳转地址直接写 href 属性就行&#xff0c;这是唯一必需的属性。没它&#xff0c;<a> 就只是个…...

告别纯仿真:手把手教你将Isaac Gym训练的TRON1 RL策略部署到真机并遥控行走

从虚拟到现实&#xff1a;TRON1机器人强化学习策略的真机部署实战指南 当你在Isaac Gym中看着TRON1机器人完美执行各种行走动作时&#xff0c;是否想过让这些虚拟策略在真实世界中"活"起来&#xff1f;仿真环境中的成功只是第一步&#xff0c;真正的挑战在于如何跨越…...

李佳琦后退,美ONE在赌一场没有“顶流”的未来

超头退潮下&#xff0c;MCN的生死命题。文&#xff5c;段泽钰编&#xff5c;郭梦仪4月8日&#xff0c;李佳琦在直播中宣布“将缺席两个月的直播”。几个小时后&#xff0c;这条消息登上热搜。他不得不紧急澄清&#xff1a;是两个月&#xff0c;不是两个季度&#xff0c;缺席是去…...

训练-推理全链路能耗暴增预警,深度解析视觉-语言-音频三模态对齐中的冗余计算黑洞(附热力图诊断模板)

第一章&#xff1a;训练-推理全链路能耗暴增预警机制构建 2026奇点智能技术大会(https://ml-summit.org) 现代大模型全生命周期中&#xff0c;训练与推理阶段的能耗已突破传统监控阈值。单次千亿参数模型训练峰值功耗可达12MW&#xff0c;而在线推理集群在流量洪峰期的PUE波动…...

LFE并发编程:如何利用Erlang OTP构建高可用系统

LFE并发编程&#xff1a;如何利用Erlang OTP构建高可用系统 【免费下载链接】lfe Lisp Flavoured Erlang (LFE) 项目地址: https://gitcode.com/gh_mirrors/lf/lfe Lisp Flavoured Erlang (LFE) 是结合了Lisp语法和Erlang强大并发能力的编程语言&#xff0c;它允许开发者…...

基于深度学习的苹果叶片病虫害识别系统,resnet50,vgg16,resnet34【pytorch框架,python源码】

更多图像分类、图像识别、目标检测、图像分割&#xff0c;图像检索等项目可从主页查看 功能演示(要看shi pin下面的简介)&#xff1a; https://www.bilibili.com/video/BV1Bs4XzcEdH/?spm_id_from333.1387.homepage.video_card.click&vd_source95b9b70984596ccebdb2780f0…...

动手学深度学习——注意力机制代码

1. 前言上一篇我们已经从思想上理解了注意力机制&#xff1a;基础 Seq2Seq 的问题在于固定长度上下文向量解码器在不同时间步&#xff0c;其实应该关注输入序列的不同位置注意力机制的本质&#xff0c;就是对输入表示做加权和权重由当前位置和各输入位置的相关性决定这一篇就继…...

保姆级教程:用Python+Matlab从零推导Panda机械臂的DH参数与正运动学

从零推导Panda机械臂&#xff1a;用Python和Matlab实现DH建模与正运动学验证 第一次接触机械臂运动学时&#xff0c;我被那些复杂的矩阵变换和参数定义搞得晕头转向。直到亲手用代码实现了一个完整的正运动学推导流程&#xff0c;才发现原来理解DH参数和坐标系变换可以如此直观…...

数据工单打标前沿技术汇总:两阶段/多阶段流水线, RAG增强分类

数据工单打标前沿技术汇总 目录 数据工单打标前沿技术汇总一、技术背景与主流趋势二、使用API调用方式的代表性论文2.1 REIC: RAG-Enhanced Intent Classification at Scale (EMNLP 2025 Industry Track)2.2 TickIt: Leveraging Large Language Models for Automated Ticket Es…...

基于LDAP与AES加密的企业级登录认证方案实践

1. 企业级登录认证的挑战与解决方案 在企业级应用开发中&#xff0c;登录认证系统往往面临多重挑战。特别是当系统需要同时支持内部员工和外协人员访问时&#xff0c;如何确保安全性、统一性和易用性就成为了关键问题。我最近参与的一个金融项目就遇到了这样的场景&#xff1a;…...