使用WebSocket实现log日志流的实时展示-从轮询到通知
场景介绍
最近开发一个系统,其中一个模块需要展示实时的执行过程,过程日志可能比较多。以前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单容易实现,但是比较消耗资源,后端没有数据的时候,会造成大量的无用轮询。所以这次我们采用长连接的方案,优化这块的逻辑,提升用户体验。

WebSocket介绍
参考:https://liaoxuefeng.com/books/java/spring/web/websocket/
WebSocket 是一种基于 HTTP 的长连接技术。传统的 HTTP 协议采用请求-响应模型,浏览器不发送请求时,服务器无法主动推送数据给浏览器。因此,当需要定期或不定期向浏览器推送数据(例如股票行情或在线聊天)时,传统的 HTTP 协议只能通过浏览器的 JavaScript 定时轮询来实现。这种方法效率低下,且实时性不高。
由于 HTTP 本身基于 TCP 连接,WebSocket 在 HTTP 协议的基础上进行了简单的升级。建立 TCP 连接后,浏览器在发送请求时附带以下头部信息:
GET /chat HTTP/1.1
Host: www.example.com
Upgrade: websocket
Connection: Upgrade
这表示客户端希望升级为长连接的 WebSocket。服务器返回升级成功的响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
收到成功响应后,WebSocket 握手即告完成。这意味着代表 WebSocket 的 TCP 连接将不会被服务器关闭,而是保持开放状态,服务器和浏览器可以随时互相推送消息。这些消息既可以是文本,也可以是二进制数据。通常,大多数应用程序会发送基于 JSON 的文本消息。
现代浏览器均已支持 WebSocket 协议,服务器端则需要底层框架的支持。Java 的 Servlet 规范从 3.1 开始支持 WebSocket,因此,必须选择支持 Servlet 3.1 或更高版本的容器,才能使用 WebSocket。最新版本的 Tomcat、Jetty 等开源服务器均已支持 WebSocket。

实践演示
Java后端
我们以实际代码来演示如何在Springboot项目中实现对Websocket的支持。
step1: 添加websocket依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
step2: 增加配置
这个配置的主要作用是自动启动使用了注解==@ServerEndpoint==的类
@Configuration
@EnableWebSocket
public class WebSocketConfiguration {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
step3: 创建一个ws endpoint
@ServerEndpoint(value = ChaosConst.CHAOS_WS_API + "/execute/log/{bizType}/{bizId}")
@Component
@Slf4j
public class LogWsEndpoint implements Consumer<ChaosLogEvent> {// 对话的标识private String bizKey;// 存储每个会话private static final ConcurrentHashMap<String, List<LogWsEndpoint>> endpointMap = new ConcurrentHashMap<>();// 将会话放入到线程池中,异步将数据返回给前端private static ThreadPoolExecutor wsThreadPoolExecutor;// 核心逻辑处理器private ChaosLogEventHandler handler;// 业务写和读logprivate static ChaosLogger chaosLogger;@Autowired@Qualifier("wsThreadPoolExecutor")public void setWsThreadPoolExecutor(ThreadPoolExecutor wsThreadPoolExecutor) {if (null != wsThreadPoolExecutor) {LogWsEndpoint.wsThreadPoolExecutor = wsThreadPoolExecutor;}}@Autowiredpublic void setChaosLogger(ChaosLogger chaosLogger) {if (null != chaosLogger) {LogWsEndpoint.chaosLogger = chaosLogger;}}@OnOpenpublic void onOpen(Session session, @PathParam("bizType") String bizType, @PathParam("bizId") String bizId) {this.bizKey = String.format("%s-%s", bizType, bizId);log.info("[ws-chaos-log]连接建立中 ==> bizKey : {}", bizKey);this.handler = new ChaosLogEventHandler(chaosLogger, session);wsThreadPoolExecutor.submit(() -> flushMessage(bizType, bizId, true));endpointMap.compute(bizKey, (key, value) -> {List<LogWsEndpoint> ends = null == value ? new ArrayList<>() : value;ends.add(this);return ends;});log.info("[ws-chaos-log]连接建立成功: sessionId:{}, bizKey : {}",session.getId(), bizKey);}public void flushMessage(String bizType, String bizId, boolean force) {this.handler.flushMessage(bizType, bizId, force);}@OnClosepublic void onClose() {log.info("websocket log server close");if (StringUtils.isBlank(bizKey)) {return;}endpointMap.compute(bizKey, (key, endpoints) -> {if (null != endpoints) {endpoints.remove(this);}return endpoints;});log.info("[ws-chaos-log]连接关闭成功,关闭该连接信息:sessionId : {}, bizKey : {}。", handler.getSession().getId(), bizKey);}@OnMessagepublic void onMessage(String message, Session session) throws IOException {log.info("[ws-chaos-log]服务端收到客户端消息 ==> sessionId : {}, bizKey : {}, message : {}", handler.getSession().getId(), bizKey, message);}@OnErrorpublic void onError(Session session, Throwable error) {log.error("[ws-chaos-log]WebSocket发生错误,sessionId : {}, bizKey : {}", handler.getSession().getId(), bizKey);}@Overridepublic void accept(ChaosLogEvent chaosLogEvent) {String contextId = String.format("%s-%s", chaosLogEvent.getBizType(), chaosLogEvent.getBizId());log.info("accept chaosLogEvent : {}", JSON.toJSONString(chaosLogEvent));List<LogWsEndpoint> logWsEndpoints = endpointMap.get(contextId);if (CollectionUtils.isEmpty(logWsEndpoints)) {return;}logWsEndpoints.forEach(endpoint -> wsThreadPoolExecutor.submit(() -> endpoint.flushMessage(chaosLogEvent.getBizType(), chaosLogEvent.getBizId(), true)));}
}
==注意:上面有个accept()==方法,这个方法后面也会讲到,主要就是用于触发已经建立连接Websocket发送消息。
核心逻辑实现, 这里读取的日志文件是存储在百度云的ois,ois读取逻辑忽略。
@Slf4j
public class ChaosLogEventHandler {private static final long READ_LOG_MOST_LEN = 1024 * 1024 * 5L; // 5Mprivate final ChaosLogger chaosLogger;@Getterprivate final Session session;private final AtomicLong offset = new AtomicLong(-1L);private final AtomicBoolean hasTruncated = new AtomicBoolean(false);private final AtomicLong waitEventCnt = new AtomicLong(0L);private final Lock lock = new ReentrantLock();public ChaosLogEventHandler(ChaosLogger chaosLogger, Session session) {this.chaosLogger = chaosLogger;this.session = session;}public void flushMessage(String bizType, String bizId, boolean force) {String bizKey = bizType + "-" + bizId;if (!lock.tryLock()) {waitEventCnt.incrementAndGet();log.info("[WS]获取锁失败,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);return;}try {if (!force && waitEventCnt.getAndSet(0L) < 1) {log.info("[ws-chaos-log]没有待处理事件,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);// 没有待处理的事件return;}log.info("[ws-chaos-log]向客户端刷新数据 ==> sessionId : {}, bizKey : {}, offset : {}", session.getId(), bizKey, offset.get());if (offset.get() < 0) {long contentLength = chaosLogger.getLogContentLength(bizType, bizId);log.info("[ws-chaos-log]contentLength = {} for bizLogKey {}", contentLength, bizKey);if (contentLength == 0) {return;}if (contentLength > READ_LOG_MOST_LEN) {offset.set(contentLength - READ_LOG_MOST_LEN);hasTruncated.set(true);log.info("[ws-chaos-log]文件过大,截取最后10M ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());} else {offset.set(0L);}} else if (!force) {long contentLength = chaosLogger.getLogContentLength(bizType, bizId);if (contentLength <= offset.get()) {log.info("[ws-chaos-log]文件长度小于offset,无需刷新 ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());return;}}// 读取日志内容BosObject bosObject = chaosLogger.readLogObject(bizType, bizId, offset.get(), Long.MAX_VALUE);try (BufferedReader reader = new BufferedReader(new InputStreamReader(bosObject.getObjectContent()))) {String line = null;while (null != (line = reader.readLine())) {if (hasTruncated.get()) {hasTruncated.set(false);log.info("[ws-chaos-log]hasTruncated changed to false");} else {log.info("[ws-chaos-log]send ws msg:{}", line);try {session.getBasicRemote().sendText(line + "\n");} catch (IllegalStateException e) {log.info("[ws-chaos-log]发送消息过程中连接状态异常,跳过", e);}}// +1是因为每一行结尾会有一个回车offset.addAndGet(line.getBytes(StandardCharsets.UTF_8).length + 1);}} catch (IOException e) {log.error("", e);}} catch (NotFoundException e) {log.info("[ws-chaos-log]未找到数据,无需向客户端同步,bizKey:{}", bizKey, e);} catch (RuntimeException e) {log.error("", e);} finally {lock.unlock();}log.info("[ws-chaos-log]向客户端刷新数据,完成 ==> sessionId : {}, bizKey : {}", session.getId(), bizKey);// 在处理过程中,可能又有新的事件,所以再次尝试刷新数据flushMessage(bizType, bizKey, false);}
}
stept5: 广播事件,全局监听
前后端建立连接的时候,绑定了后端一台机器,但是后台一般都是多台服务器,如果事件传递到其他服务器,那么已经建立的连接如何监听到并返回内呢?
这里使用了rocketmq的机制,每台机器都会监听到事件的变化,从而触发当前机器将变更内容发回到前端。
@Component
@RocketMQMessageListener(topic = "EXECUTE_FLOW_LOG", selectorExpression = "log", consumerGroup = "flow-log", messageModel = MessageModel.BROADCASTING)
@Slf4j
public class ChaosLogEventConsumer implements RocketMQListener<String> {@Autowired(required = false)private List<Consumer<ChaosLogEvent>> chaosLogEventConsumers = Collections.emptyList();@Overridepublic void onMessage(String message) {log.info("[MQ]receive ChaosLogEvent message:{}", message);ChaosLogEvent event = JsonUtils.fromJson(message, ChaosLogEvent.class);for (Consumer<ChaosLogEvent> consumer : chaosLogEventConsumers) {try {consumer.accept(event);} catch (RuntimeException e) {log.error("[MQ] failed consume ChaosLogEvent message,consumer:" + consumer.getClass(), e);}}}
}
前端代码实现
以react为例,仅供参考:
export const fetchExecuteLogs = (bizType: string, bizId: any, logsRef: any, setLogs: any) => {if (!bizType || !bizId) {console.log('fetchLogs: logContextToken or node is null')return}setLogs([])if (logsRef.current[0]) {console.log('close ws')logsRef.current[0].close()}let host = wsHost ? wsHost : window.location.hostlet protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'let client = new WebSocket(`${protocol}://${host}/ws/ark/chaos/execute/log/${bizType}/${bizId}`)logsRef.current = [client, []]// 报错的回调函数client.onerror = (e: any) => {console.log('Connection Error')console.log(e)}//链接打开的回调函数client.onopen = () => {console.log('WebSocket Client Connected')}//链接关闭的回调函数client.onclose = () => {console.log('echo-protocol Client Closed')}//收到消息的处理函数client.onmessage = (e: any) => {if (logsRef.current[0] === client) {if (typeof e.data === 'string') {let newLogs = [...logsRef.current[1], e.data]if (newLogs.length > 250) {newLogs = newLogs.slice(200)}setLogs(newLogs)logsRef.current = [client, newLogs]}} else {client.close()}}const sendPing = () => {if (logsRef.current[0] === client) {const data = { message: 'heartbeat' }client.send(JSON.stringify(data))setTimeout(sendPing, 10000)}}setTimeout(sendPing, 10000)
}
相关文章:
使用WebSocket实现log日志流的实时展示-从轮询到通知
场景介绍 最近开发一个系统,其中一个模块需要展示实时的执行过程,过程日志可能比较多。以前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单容易实现,但是比较消耗资源&#…...
UE5 从零开始制作跟随的大鹅
文章目录 二、绑定骨骼三、创建 ControlRig四、创建动画五、创建动画蓝图六、自动寻路七、生成 goose八、碰撞 和 Physics Asset缺点 # 一、下载模型 首先我们需要下载一个静态网格体,这里我们可以从 Sketchfab 中下载:Goose Low Poly - Download Free …...
O’Reilly
--江上往来人,但爱鲈鱼美。 --君看一叶舟,出没风波里。 OReilly OReilly出版社出版的技术类图书 俗称动物系列 应该是每个技术人员的必备手册。 OReilly动物系列(中译本) 简介" 动物系列作为 OReilly 书籍的典型代表被普遍…...
优盘驱动器未格式化:数据拯救行动指南
优盘困境:驱动器未格式化的挑战 在日常的数据存储与传输中,优盘以其便携性和高容量成为了我们不可或缺的伙伴。然而,当您尝试访问优盘时,突然弹出的“驱动器未被格式化”提示却如同晴天霹雳,让人措手不及。这一状况不…...
4.Handler mappings
处理程序映射 简介 在早期版本的 Spring 中,用户需要在 Web 应用程序上下文中定义一个或多个 HandlerMapping bean 以将传入的 Web 请求映射到适当的处理程序。随着注解控制器的引入,通常不再需要这样做,因为 RequestMappingHandlerMapping…...
《学会 SpringMVC 系列 · 消息转换器 MessageConverters》
📢 大家好,我是 【战神刘玉栋】,有10多年的研发经验,致力于前后端技术栈的知识沉淀和传播。 💗 🌻 CSDN入驻不久,希望大家多多支持,后续会继续提升文章质量,绝不滥竽充数…...
深度学习项目 -7-使用 Python 的手写数字识别
一、前言 该文章仅作为个人学习使用 二、正文 项目源代码:深度学习项目 - 使用 Python 进行手写数字识别 - DataFlair (data-flair.training) 数据集:https://drive.google.com/open?id1hJiOlxctFH3uL2yTqXU_1f6c0zLr8V_K Python 深…...
MySQL —— 库,数据类型 与 表
库与基础操作 1.1 查看数据库 使用 show databases; 可以查看当前 MySQL 目前有多少个数据库 5 rows 表示有 5 行,这里是表示的是有效的数据,不包括 第一行的指引 set 表示结果集合 0.01 sec 表示这个 sql 语句一共运行了0.01 秒,一般情况…...
Java重修笔记 第二十七天 匿名内部类
匿名内部类 1. 定义:无类名(底层自动分配类名“外部类名$1”),既是类也是对象,定义在外部类的局部位置,例如方法体和代码块中,通过new类或接口并在大括号里重写方法来实现。 2. 使用场景&…...
Nero Lens 智图 - 适用于 iOS 和 iPadOS 的专业图片处理 App
首先是手机端的无损放大 App:Nero Lens 智图,适用于 iOS 和 iPadOS,不仅可以放大,还有多种 AI 图片增强功能。 使用这款 App 可以通过 AI 模型智能放大可达 400%,还有老照片去划痕、上色,抠图移除背景、照…...
Nginx代理路径被吃
Nginx代理路径被吃的情况 日常工作中经常使用nginx反向代理一些资源,有时正常代理,发现代理不过去。 验证被吃调location情况 通过浏览器访问: https://zhao138969.com/LinuxPackage/Python/SelectDocker location /LinuxPackage { proxy…...
pytest-html报告修改与汉化
前言 Pytest框架可以使用两种测试报告,其中一种就是使用pytest-html插件生成的测试报告,但是报告中有一些信息没有什么用途或者显示的不太好看,还有一些我们想要在报告中展示的信息却没有,最近又有人问我pytest-html生成的报告&a…...
react-native从入门到实战系列教程一Swiper组件的使用及bug修复
轮播图,在app中随处可见,这么重要的功能我们怎么可能不学习下在react-native中的实现方式。 依然是第三方组件react-native-swiper 官网地址 https://www.npmjs.com/package/react-native-swiper 组件使用的组件及事件参考官方即可。 实现效果 官网…...
springboot开发的常用注解总结-配置组件类注解
Spring Boot 提供了许多注解,这些注解大大简化了 Spring 应用的配置和开发过程。以下是一些常见的 Spring Boot注解及其作用。 目录 配置组件类 (Configure Component )Configuration解释:Demo Code:更深度使用&#x…...
DataX 最新版本安装部署
1、下载 git clone gitgithub.com:alibaba/DataX.git 2、打包 mvn -U clean package assembly:assembly -Dmaven.test.skiptrue...
【架构】应用保护
这篇文章总结一下应用保护的手段。如今说到应用保护,更多的会想到阿里的sentinel,手段丰富,应用简单。sentinel的限流、降级、熔断,可以自己去试一下,sentinel主要通过配置实现功能,不难。sentinel的简介放…...
从核心到边界:六边形、洋葱与COLA架构的深度解析
文章目录 1 引言2 软件架构3 架构分类4 典型的应用架构4.1 分层架构4.2 CQRS4.3 六边形架构4.4 洋葱架构4.5 DDD 5 COLA架构设计5.1 分层设计5.2 扩展设计5.3 规范设计5.3.1 组件规范5.3.2 包规范5.3.3 命名规范 6 COLA架构总览7 小结 1 引言 软件的首要技术使命:管…...
04-Fastjson反序列化漏洞
免责声明 本文仅限于学习讨论与技术知识的分享,不得违反当地国家的法律法规。对于传播、利用文章中提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,本文作者不为此承担任何责任,一旦造成后果请自行承担&…...
ABC365(A-D)未补
A - Leap Year(模拟) 题意:给定一个数字n,如果n不是4的倍数,输出365;如果n是4的倍数但不是100的倍数,输出366;如果n是100的倍数但不是400的倍数,输出365;如果…...
Python用png生成不同尺寸的图标
Kimi生成 from PIL import Imagedef generate_icon(source_image_path, output_image_path, size):with Image.open(source_image_path) as img:# 转换图片为RGBA模式,确保有透明通道if img.mode ! RGBA:img img.convert(RGBA)# 调整图片大小到指定尺寸img img.r…...
23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
MySQL:分区的基本使用
目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区(Partitioning)是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分(分区)可以独立存储、管理和优化,…...
快速排序算法改进:随机快排-荷兰国旗划分详解
随机快速排序-荷兰国旗划分算法详解 一、基础知识回顾1.1 快速排序简介1.2 荷兰国旗问题 二、随机快排 - 荷兰国旗划分原理2.1 随机化枢轴选择2.2 荷兰国旗划分过程2.3 结合随机快排与荷兰国旗划分 三、代码实现3.1 Python实现3.2 Java实现3.3 C实现 四、性能分析4.1 时间复杂度…...
WebRTC调研
WebRTC是什么,为什么,如何使用 WebRTC有什么优势 WebRTC Architecture Amazon KVS WebRTC 其它厂商WebRTC 海康门禁WebRTC 海康门禁其他界面整理 威视通WebRTC 局域网 Google浏览器 Microsoft Edge 公网 RTSP RTMP NVR ONVIF SIP SRT WebRTC协…...
UE5 音效系统
一.音效管理 音乐一般都是WAV,创建一个背景音乐类SoudClass,一个音效类SoundClass。所有的音乐都分为这两个类。再创建一个总音乐类,将上述两个作为它的子类。 接着我们创建一个音乐混合类SoundMix,将上述三个类翻入其中,通过它管理每个音乐…...
【题解-洛谷】P10480 可达性统计
题目:P10480 可达性统计 题目描述 给定一张 N N N 个点 M M M 条边的有向无环图,分别统计从每个点出发能够到达的点的数量。 输入格式 第一行两个整数 N , M N,M N,M,接下来 M M M 行每行两个整数 x , y x,y x,y,表示从 …...
【Redis】Redis从入门到实战:全面指南
Redis从入门到实战:全面指南 一、Redis简介 Redis(Remote Dictionary Server)是一个开源的、基于内存的键值存储系统,它可以用作数据库、缓存和消息代理。由Salvatore Sanfilippo于2009年开发,因其高性能、丰富的数据结构和广泛的语言支持而广受欢迎。 Redis核心特点:…...
简单聊下阿里云DNS劫持事件
阿里云域名被DNS劫持事件 事件总结 根据ICANN规则,域名注册商(Verisign)认定aliyuncs.com域名下的部分网站被用于非法活动(如传播恶意软件);顶级域名DNS服务器将aliyuncs.com域名的DNS记录统一解析到shado…...
