使用WebSocket实现log日志流的实时展示-从轮询到通知
场景介绍
最近开发一个系统,其中一个模块需要展示实时的执行过程,过程日志可能比较多。以前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单容易实现,但是比较消耗资源,后端没有数据的时候,会造成大量的无用轮询。所以这次我们采用长连接的方案,优化这块的逻辑,提升用户体验。

WebSocket介绍
参考:https://liaoxuefeng.com/books/java/spring/web/websocket/
WebSocket 是一种基于 HTTP 的长连接技术。传统的 HTTP 协议采用请求-响应模型,浏览器不发送请求时,服务器无法主动推送数据给浏览器。因此,当需要定期或不定期向浏览器推送数据(例如股票行情或在线聊天)时,传统的 HTTP 协议只能通过浏览器的 JavaScript 定时轮询来实现。这种方法效率低下,且实时性不高。
由于 HTTP 本身基于 TCP 连接,WebSocket 在 HTTP 协议的基础上进行了简单的升级。建立 TCP 连接后,浏览器在发送请求时附带以下头部信息:
GET /chat HTTP/1.1
Host: www.example.com
Upgrade: websocket
Connection: Upgrade
这表示客户端希望升级为长连接的 WebSocket。服务器返回升级成功的响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
收到成功响应后,WebSocket 握手即告完成。这意味着代表 WebSocket 的 TCP 连接将不会被服务器关闭,而是保持开放状态,服务器和浏览器可以随时互相推送消息。这些消息既可以是文本,也可以是二进制数据。通常,大多数应用程序会发送基于 JSON 的文本消息。
现代浏览器均已支持 WebSocket 协议,服务器端则需要底层框架的支持。Java 的 Servlet 规范从 3.1 开始支持 WebSocket,因此,必须选择支持 Servlet 3.1 或更高版本的容器,才能使用 WebSocket。最新版本的 Tomcat、Jetty 等开源服务器均已支持 WebSocket。

实践演示
Java后端
我们以实际代码来演示如何在Springboot项目中实现对Websocket的支持。
step1: 添加websocket依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
step2: 增加配置
这个配置的主要作用是自动启动使用了注解==@ServerEndpoint==的类
@Configuration
@EnableWebSocket
public class WebSocketConfiguration {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
step3: 创建一个ws endpoint
@ServerEndpoint(value = ChaosConst.CHAOS_WS_API + "/execute/log/{bizType}/{bizId}")
@Component
@Slf4j
public class LogWsEndpoint implements Consumer<ChaosLogEvent> {// 对话的标识private String bizKey;// 存储每个会话private static final ConcurrentHashMap<String, List<LogWsEndpoint>> endpointMap = new ConcurrentHashMap<>();// 将会话放入到线程池中,异步将数据返回给前端private static ThreadPoolExecutor wsThreadPoolExecutor;// 核心逻辑处理器private ChaosLogEventHandler handler;// 业务写和读logprivate static ChaosLogger chaosLogger;@Autowired@Qualifier("wsThreadPoolExecutor")public void setWsThreadPoolExecutor(ThreadPoolExecutor wsThreadPoolExecutor) {if (null != wsThreadPoolExecutor) {LogWsEndpoint.wsThreadPoolExecutor = wsThreadPoolExecutor;}}@Autowiredpublic void setChaosLogger(ChaosLogger chaosLogger) {if (null != chaosLogger) {LogWsEndpoint.chaosLogger = chaosLogger;}}@OnOpenpublic void onOpen(Session session, @PathParam("bizType") String bizType, @PathParam("bizId") String bizId) {this.bizKey = String.format("%s-%s", bizType, bizId);log.info("[ws-chaos-log]连接建立中 ==> bizKey : {}", bizKey);this.handler = new ChaosLogEventHandler(chaosLogger, session);wsThreadPoolExecutor.submit(() -> flushMessage(bizType, bizId, true));endpointMap.compute(bizKey, (key, value) -> {List<LogWsEndpoint> ends = null == value ? new ArrayList<>() : value;ends.add(this);return ends;});log.info("[ws-chaos-log]连接建立成功: sessionId:{}, bizKey : {}",session.getId(), bizKey);}public void flushMessage(String bizType, String bizId, boolean force) {this.handler.flushMessage(bizType, bizId, force);}@OnClosepublic void onClose() {log.info("websocket log server close");if (StringUtils.isBlank(bizKey)) {return;}endpointMap.compute(bizKey, (key, endpoints) -> {if (null != endpoints) {endpoints.remove(this);}return endpoints;});log.info("[ws-chaos-log]连接关闭成功,关闭该连接信息:sessionId : {}, bizKey : {}。", handler.getSession().getId(), bizKey);}@OnMessagepublic void onMessage(String message, Session session) throws IOException {log.info("[ws-chaos-log]服务端收到客户端消息 ==> sessionId : {}, bizKey : {}, message : {}", handler.getSession().getId(), bizKey, message);}@OnErrorpublic void onError(Session session, Throwable error) {log.error("[ws-chaos-log]WebSocket发生错误,sessionId : {}, bizKey : {}", handler.getSession().getId(), bizKey);}@Overridepublic void accept(ChaosLogEvent chaosLogEvent) {String contextId = String.format("%s-%s", chaosLogEvent.getBizType(), chaosLogEvent.getBizId());log.info("accept chaosLogEvent : {}", JSON.toJSONString(chaosLogEvent));List<LogWsEndpoint> logWsEndpoints = endpointMap.get(contextId);if (CollectionUtils.isEmpty(logWsEndpoints)) {return;}logWsEndpoints.forEach(endpoint -> wsThreadPoolExecutor.submit(() -> endpoint.flushMessage(chaosLogEvent.getBizType(), chaosLogEvent.getBizId(), true)));}
}
==注意:上面有个accept()==方法,这个方法后面也会讲到,主要就是用于触发已经建立连接Websocket发送消息。
核心逻辑实现, 这里读取的日志文件是存储在百度云的ois,ois读取逻辑忽略。
@Slf4j
public class ChaosLogEventHandler {private static final long READ_LOG_MOST_LEN = 1024 * 1024 * 5L; // 5Mprivate final ChaosLogger chaosLogger;@Getterprivate final Session session;private final AtomicLong offset = new AtomicLong(-1L);private final AtomicBoolean hasTruncated = new AtomicBoolean(false);private final AtomicLong waitEventCnt = new AtomicLong(0L);private final Lock lock = new ReentrantLock();public ChaosLogEventHandler(ChaosLogger chaosLogger, Session session) {this.chaosLogger = chaosLogger;this.session = session;}public void flushMessage(String bizType, String bizId, boolean force) {String bizKey = bizType + "-" + bizId;if (!lock.tryLock()) {waitEventCnt.incrementAndGet();log.info("[WS]获取锁失败,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);return;}try {if (!force && waitEventCnt.getAndSet(0L) < 1) {log.info("[ws-chaos-log]没有待处理事件,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);// 没有待处理的事件return;}log.info("[ws-chaos-log]向客户端刷新数据 ==> sessionId : {}, bizKey : {}, offset : {}", session.getId(), bizKey, offset.get());if (offset.get() < 0) {long contentLength = chaosLogger.getLogContentLength(bizType, bizId);log.info("[ws-chaos-log]contentLength = {} for bizLogKey {}", contentLength, bizKey);if (contentLength == 0) {return;}if (contentLength > READ_LOG_MOST_LEN) {offset.set(contentLength - READ_LOG_MOST_LEN);hasTruncated.set(true);log.info("[ws-chaos-log]文件过大,截取最后10M ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());} else {offset.set(0L);}} else if (!force) {long contentLength = chaosLogger.getLogContentLength(bizType, bizId);if (contentLength <= offset.get()) {log.info("[ws-chaos-log]文件长度小于offset,无需刷新 ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());return;}}// 读取日志内容BosObject bosObject = chaosLogger.readLogObject(bizType, bizId, offset.get(), Long.MAX_VALUE);try (BufferedReader reader = new BufferedReader(new InputStreamReader(bosObject.getObjectContent()))) {String line = null;while (null != (line = reader.readLine())) {if (hasTruncated.get()) {hasTruncated.set(false);log.info("[ws-chaos-log]hasTruncated changed to false");} else {log.info("[ws-chaos-log]send ws msg:{}", line);try {session.getBasicRemote().sendText(line + "\n");} catch (IllegalStateException e) {log.info("[ws-chaos-log]发送消息过程中连接状态异常,跳过", e);}}// +1是因为每一行结尾会有一个回车offset.addAndGet(line.getBytes(StandardCharsets.UTF_8).length + 1);}} catch (IOException e) {log.error("", e);}} catch (NotFoundException e) {log.info("[ws-chaos-log]未找到数据,无需向客户端同步,bizKey:{}", bizKey, e);} catch (RuntimeException e) {log.error("", e);} finally {lock.unlock();}log.info("[ws-chaos-log]向客户端刷新数据,完成 ==> sessionId : {}, bizKey : {}", session.getId(), bizKey);// 在处理过程中,可能又有新的事件,所以再次尝试刷新数据flushMessage(bizType, bizKey, false);}
}
stept5: 广播事件,全局监听
前后端建立连接的时候,绑定了后端一台机器,但是后台一般都是多台服务器,如果事件传递到其他服务器,那么已经建立的连接如何监听到并返回内呢?
这里使用了rocketmq的机制,每台机器都会监听到事件的变化,从而触发当前机器将变更内容发回到前端。
@Component
@RocketMQMessageListener(topic = "EXECUTE_FLOW_LOG", selectorExpression = "log", consumerGroup = "flow-log", messageModel = MessageModel.BROADCASTING)
@Slf4j
public class ChaosLogEventConsumer implements RocketMQListener<String> {@Autowired(required = false)private List<Consumer<ChaosLogEvent>> chaosLogEventConsumers = Collections.emptyList();@Overridepublic void onMessage(String message) {log.info("[MQ]receive ChaosLogEvent message:{}", message);ChaosLogEvent event = JsonUtils.fromJson(message, ChaosLogEvent.class);for (Consumer<ChaosLogEvent> consumer : chaosLogEventConsumers) {try {consumer.accept(event);} catch (RuntimeException e) {log.error("[MQ] failed consume ChaosLogEvent message,consumer:" + consumer.getClass(), e);}}}
}
前端代码实现
以react为例,仅供参考:
export const fetchExecuteLogs = (bizType: string, bizId: any, logsRef: any, setLogs: any) => {if (!bizType || !bizId) {console.log('fetchLogs: logContextToken or node is null')return}setLogs([])if (logsRef.current[0]) {console.log('close ws')logsRef.current[0].close()}let host = wsHost ? wsHost : window.location.hostlet protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'let client = new WebSocket(`${protocol}://${host}/ws/ark/chaos/execute/log/${bizType}/${bizId}`)logsRef.current = [client, []]// 报错的回调函数client.onerror = (e: any) => {console.log('Connection Error')console.log(e)}//链接打开的回调函数client.onopen = () => {console.log('WebSocket Client Connected')}//链接关闭的回调函数client.onclose = () => {console.log('echo-protocol Client Closed')}//收到消息的处理函数client.onmessage = (e: any) => {if (logsRef.current[0] === client) {if (typeof e.data === 'string') {let newLogs = [...logsRef.current[1], e.data]if (newLogs.length > 250) {newLogs = newLogs.slice(200)}setLogs(newLogs)logsRef.current = [client, newLogs]}} else {client.close()}}const sendPing = () => {if (logsRef.current[0] === client) {const data = { message: 'heartbeat' }client.send(JSON.stringify(data))setTimeout(sendPing, 10000)}}setTimeout(sendPing, 10000)
}
相关文章:
使用WebSocket实现log日志流的实时展示-从轮询到通知
场景介绍 最近开发一个系统,其中一个模块需要展示实时的执行过程,过程日志可能比较多。以前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单容易实现,但是比较消耗资源&#…...
UE5 从零开始制作跟随的大鹅
文章目录 二、绑定骨骼三、创建 ControlRig四、创建动画五、创建动画蓝图六、自动寻路七、生成 goose八、碰撞 和 Physics Asset缺点 # 一、下载模型 首先我们需要下载一个静态网格体,这里我们可以从 Sketchfab 中下载:Goose Low Poly - Download Free …...
O’Reilly
--江上往来人,但爱鲈鱼美。 --君看一叶舟,出没风波里。 OReilly OReilly出版社出版的技术类图书 俗称动物系列 应该是每个技术人员的必备手册。 OReilly动物系列(中译本) 简介" 动物系列作为 OReilly 书籍的典型代表被普遍…...
优盘驱动器未格式化:数据拯救行动指南
优盘困境:驱动器未格式化的挑战 在日常的数据存储与传输中,优盘以其便携性和高容量成为了我们不可或缺的伙伴。然而,当您尝试访问优盘时,突然弹出的“驱动器未被格式化”提示却如同晴天霹雳,让人措手不及。这一状况不…...
4.Handler mappings
处理程序映射 简介 在早期版本的 Spring 中,用户需要在 Web 应用程序上下文中定义一个或多个 HandlerMapping bean 以将传入的 Web 请求映射到适当的处理程序。随着注解控制器的引入,通常不再需要这样做,因为 RequestMappingHandlerMapping…...
《学会 SpringMVC 系列 · 消息转换器 MessageConverters》
📢 大家好,我是 【战神刘玉栋】,有10多年的研发经验,致力于前后端技术栈的知识沉淀和传播。 💗 🌻 CSDN入驻不久,希望大家多多支持,后续会继续提升文章质量,绝不滥竽充数…...
深度学习项目 -7-使用 Python 的手写数字识别
一、前言 该文章仅作为个人学习使用 二、正文 项目源代码:深度学习项目 - 使用 Python 进行手写数字识别 - DataFlair (data-flair.training) 数据集:https://drive.google.com/open?id1hJiOlxctFH3uL2yTqXU_1f6c0zLr8V_K Python 深…...
MySQL —— 库,数据类型 与 表
库与基础操作 1.1 查看数据库 使用 show databases; 可以查看当前 MySQL 目前有多少个数据库 5 rows 表示有 5 行,这里是表示的是有效的数据,不包括 第一行的指引 set 表示结果集合 0.01 sec 表示这个 sql 语句一共运行了0.01 秒,一般情况…...
Java重修笔记 第二十七天 匿名内部类
匿名内部类 1. 定义:无类名(底层自动分配类名“外部类名$1”),既是类也是对象,定义在外部类的局部位置,例如方法体和代码块中,通过new类或接口并在大括号里重写方法来实现。 2. 使用场景&…...
Nero Lens 智图 - 适用于 iOS 和 iPadOS 的专业图片处理 App
首先是手机端的无损放大 App:Nero Lens 智图,适用于 iOS 和 iPadOS,不仅可以放大,还有多种 AI 图片增强功能。 使用这款 App 可以通过 AI 模型智能放大可达 400%,还有老照片去划痕、上色,抠图移除背景、照…...
Nginx代理路径被吃
Nginx代理路径被吃的情况 日常工作中经常使用nginx反向代理一些资源,有时正常代理,发现代理不过去。 验证被吃调location情况 通过浏览器访问: https://zhao138969.com/LinuxPackage/Python/SelectDocker location /LinuxPackage { proxy…...
pytest-html报告修改与汉化
前言 Pytest框架可以使用两种测试报告,其中一种就是使用pytest-html插件生成的测试报告,但是报告中有一些信息没有什么用途或者显示的不太好看,还有一些我们想要在报告中展示的信息却没有,最近又有人问我pytest-html生成的报告&a…...
react-native从入门到实战系列教程一Swiper组件的使用及bug修复
轮播图,在app中随处可见,这么重要的功能我们怎么可能不学习下在react-native中的实现方式。 依然是第三方组件react-native-swiper 官网地址 https://www.npmjs.com/package/react-native-swiper 组件使用的组件及事件参考官方即可。 实现效果 官网…...
springboot开发的常用注解总结-配置组件类注解
Spring Boot 提供了许多注解,这些注解大大简化了 Spring 应用的配置和开发过程。以下是一些常见的 Spring Boot注解及其作用。 目录 配置组件类 (Configure Component )Configuration解释:Demo Code:更深度使用&#x…...
DataX 最新版本安装部署
1、下载 git clone gitgithub.com:alibaba/DataX.git 2、打包 mvn -U clean package assembly:assembly -Dmaven.test.skiptrue...
【架构】应用保护
这篇文章总结一下应用保护的手段。如今说到应用保护,更多的会想到阿里的sentinel,手段丰富,应用简单。sentinel的限流、降级、熔断,可以自己去试一下,sentinel主要通过配置实现功能,不难。sentinel的简介放…...
从核心到边界:六边形、洋葱与COLA架构的深度解析
文章目录 1 引言2 软件架构3 架构分类4 典型的应用架构4.1 分层架构4.2 CQRS4.3 六边形架构4.4 洋葱架构4.5 DDD 5 COLA架构设计5.1 分层设计5.2 扩展设计5.3 规范设计5.3.1 组件规范5.3.2 包规范5.3.3 命名规范 6 COLA架构总览7 小结 1 引言 软件的首要技术使命:管…...
04-Fastjson反序列化漏洞
免责声明 本文仅限于学习讨论与技术知识的分享,不得违反当地国家的法律法规。对于传播、利用文章中提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,本文作者不为此承担任何责任,一旦造成后果请自行承担&…...
ABC365(A-D)未补
A - Leap Year(模拟) 题意:给定一个数字n,如果n不是4的倍数,输出365;如果n是4的倍数但不是100的倍数,输出366;如果n是100的倍数但不是400的倍数,输出365;如果…...
Python用png生成不同尺寸的图标
Kimi生成 from PIL import Imagedef generate_icon(source_image_path, output_image_path, size):with Image.open(source_image_path) as img:# 转换图片为RGBA模式,确保有透明通道if img.mode ! RGBA:img img.convert(RGBA)# 调整图片大小到指定尺寸img img.r…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...
视觉slam十四讲实践部分记录——ch2、ch3
ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...
[ACTF2020 新生赛]Include 1(php://filter伪协议)
题目 做法 启动靶机,点进去 点进去 查看URL,有 ?fileflag.php说明存在文件包含,原理是php://filter 协议 当它与包含函数结合时,php://filter流会被当作php文件执行。 用php://filter加编码,能让PHP把文件内容…...
WebRTC从入门到实践 - 零基础教程
WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC? WebRTC(Web Real-Time Communication)是一个支持网页浏览器进行实时语音…...
Ubuntu系统多网卡多相机IP设置方法
目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机,交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息,系统版本:Ubuntu22.04.5 LTS;内核版本…...
在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7
在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤: 第一步: 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为: // 改为 v…...
