使用WebSocket实现log日志流的实时展示-从轮询到通知
场景介绍
最近开发一个系统,其中一个模块需要展示实时的执行过程,过程日志可能比较多。以前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单容易实现,但是比较消耗资源,后端没有数据的时候,会造成大量的无用轮询。所以这次我们采用长连接的方案,优化这块的逻辑,提升用户体验。
WebSocket介绍
参考:https://liaoxuefeng.com/books/java/spring/web/websocket/
WebSocket 是一种基于 HTTP 的长连接技术。传统的 HTTP 协议采用请求-响应模型,浏览器不发送请求时,服务器无法主动推送数据给浏览器。因此,当需要定期或不定期向浏览器推送数据(例如股票行情或在线聊天)时,传统的 HTTP 协议只能通过浏览器的 JavaScript 定时轮询来实现。这种方法效率低下,且实时性不高。
由于 HTTP 本身基于 TCP 连接,WebSocket 在 HTTP 协议的基础上进行了简单的升级。建立 TCP 连接后,浏览器在发送请求时附带以下头部信息:
GET /chat HTTP/1.1
Host: www.example.com
Upgrade: websocket
Connection: Upgrade
这表示客户端希望升级为长连接的 WebSocket。服务器返回升级成功的响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
收到成功响应后,WebSocket 握手即告完成。这意味着代表 WebSocket 的 TCP 连接将不会被服务器关闭,而是保持开放状态,服务器和浏览器可以随时互相推送消息。这些消息既可以是文本,也可以是二进制数据。通常,大多数应用程序会发送基于 JSON 的文本消息。
现代浏览器均已支持 WebSocket 协议,服务器端则需要底层框架的支持。Java 的 Servlet 规范从 3.1 开始支持 WebSocket,因此,必须选择支持 Servlet 3.1 或更高版本的容器,才能使用 WebSocket。最新版本的 Tomcat、Jetty 等开源服务器均已支持 WebSocket。
实践演示
Java后端
我们以实际代码来演示如何在Springboot项目中实现对Websocket的支持。
step1: 添加websocket依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
step2: 增加配置
这个配置的主要作用是自动启动使用了注解==@ServerEndpoint==的类
@Configuration
@EnableWebSocket
public class WebSocketConfiguration {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
step3: 创建一个ws endpoint
@ServerEndpoint(value = ChaosConst.CHAOS_WS_API + "/execute/log/{bizType}/{bizId}")
@Component
@Slf4j
public class LogWsEndpoint implements Consumer<ChaosLogEvent> {// 对话的标识private String bizKey;// 存储每个会话private static final ConcurrentHashMap<String, List<LogWsEndpoint>> endpointMap = new ConcurrentHashMap<>();// 将会话放入到线程池中,异步将数据返回给前端private static ThreadPoolExecutor wsThreadPoolExecutor;// 核心逻辑处理器private ChaosLogEventHandler handler;// 业务写和读logprivate static ChaosLogger chaosLogger;@Autowired@Qualifier("wsThreadPoolExecutor")public void setWsThreadPoolExecutor(ThreadPoolExecutor wsThreadPoolExecutor) {if (null != wsThreadPoolExecutor) {LogWsEndpoint.wsThreadPoolExecutor = wsThreadPoolExecutor;}}@Autowiredpublic void setChaosLogger(ChaosLogger chaosLogger) {if (null != chaosLogger) {LogWsEndpoint.chaosLogger = chaosLogger;}}@OnOpenpublic void onOpen(Session session, @PathParam("bizType") String bizType, @PathParam("bizId") String bizId) {this.bizKey = String.format("%s-%s", bizType, bizId);log.info("[ws-chaos-log]连接建立中 ==> bizKey : {}", bizKey);this.handler = new ChaosLogEventHandler(chaosLogger, session);wsThreadPoolExecutor.submit(() -> flushMessage(bizType, bizId, true));endpointMap.compute(bizKey, (key, value) -> {List<LogWsEndpoint> ends = null == value ? new ArrayList<>() : value;ends.add(this);return ends;});log.info("[ws-chaos-log]连接建立成功: sessionId:{}, bizKey : {}",session.getId(), bizKey);}public void flushMessage(String bizType, String bizId, boolean force) {this.handler.flushMessage(bizType, bizId, force);}@OnClosepublic void onClose() {log.info("websocket log server close");if (StringUtils.isBlank(bizKey)) {return;}endpointMap.compute(bizKey, (key, endpoints) -> {if (null != endpoints) {endpoints.remove(this);}return endpoints;});log.info("[ws-chaos-log]连接关闭成功,关闭该连接信息:sessionId : {}, bizKey : {}。", handler.getSession().getId(), bizKey);}@OnMessagepublic void onMessage(String message, Session session) throws IOException {log.info("[ws-chaos-log]服务端收到客户端消息 ==> sessionId : {}, bizKey : {}, message : {}", handler.getSession().getId(), bizKey, message);}@OnErrorpublic void onError(Session session, Throwable error) {log.error("[ws-chaos-log]WebSocket发生错误,sessionId : {}, bizKey : {}", handler.getSession().getId(), bizKey);}@Overridepublic void accept(ChaosLogEvent chaosLogEvent) {String contextId = String.format("%s-%s", chaosLogEvent.getBizType(), chaosLogEvent.getBizId());log.info("accept chaosLogEvent : {}", JSON.toJSONString(chaosLogEvent));List<LogWsEndpoint> logWsEndpoints = endpointMap.get(contextId);if (CollectionUtils.isEmpty(logWsEndpoints)) {return;}logWsEndpoints.forEach(endpoint -> wsThreadPoolExecutor.submit(() -> endpoint.flushMessage(chaosLogEvent.getBizType(), chaosLogEvent.getBizId(), true)));}
}
==注意:上面有个accept()==方法,这个方法后面也会讲到,主要就是用于触发已经建立连接Websocket发送消息。
核心逻辑实现, 这里读取的日志文件是存储在百度云的ois,ois读取逻辑忽略。
@Slf4j
public class ChaosLogEventHandler {private static final long READ_LOG_MOST_LEN = 1024 * 1024 * 5L; // 5Mprivate final ChaosLogger chaosLogger;@Getterprivate final Session session;private final AtomicLong offset = new AtomicLong(-1L);private final AtomicBoolean hasTruncated = new AtomicBoolean(false);private final AtomicLong waitEventCnt = new AtomicLong(0L);private final Lock lock = new ReentrantLock();public ChaosLogEventHandler(ChaosLogger chaosLogger, Session session) {this.chaosLogger = chaosLogger;this.session = session;}public void flushMessage(String bizType, String bizId, boolean force) {String bizKey = bizType + "-" + bizId;if (!lock.tryLock()) {waitEventCnt.incrementAndGet();log.info("[WS]获取锁失败,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);return;}try {if (!force && waitEventCnt.getAndSet(0L) < 1) {log.info("[ws-chaos-log]没有待处理事件,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);// 没有待处理的事件return;}log.info("[ws-chaos-log]向客户端刷新数据 ==> sessionId : {}, bizKey : {}, offset : {}", session.getId(), bizKey, offset.get());if (offset.get() < 0) {long contentLength = chaosLogger.getLogContentLength(bizType, bizId);log.info("[ws-chaos-log]contentLength = {} for bizLogKey {}", contentLength, bizKey);if (contentLength == 0) {return;}if (contentLength > READ_LOG_MOST_LEN) {offset.set(contentLength - READ_LOG_MOST_LEN);hasTruncated.set(true);log.info("[ws-chaos-log]文件过大,截取最后10M ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());} else {offset.set(0L);}} else if (!force) {long contentLength = chaosLogger.getLogContentLength(bizType, bizId);if (contentLength <= offset.get()) {log.info("[ws-chaos-log]文件长度小于offset,无需刷新 ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());return;}}// 读取日志内容BosObject bosObject = chaosLogger.readLogObject(bizType, bizId, offset.get(), Long.MAX_VALUE);try (BufferedReader reader = new BufferedReader(new InputStreamReader(bosObject.getObjectContent()))) {String line = null;while (null != (line = reader.readLine())) {if (hasTruncated.get()) {hasTruncated.set(false);log.info("[ws-chaos-log]hasTruncated changed to false");} else {log.info("[ws-chaos-log]send ws msg:{}", line);try {session.getBasicRemote().sendText(line + "\n");} catch (IllegalStateException e) {log.info("[ws-chaos-log]发送消息过程中连接状态异常,跳过", e);}}// +1是因为每一行结尾会有一个回车offset.addAndGet(line.getBytes(StandardCharsets.UTF_8).length + 1);}} catch (IOException e) {log.error("", e);}} catch (NotFoundException e) {log.info("[ws-chaos-log]未找到数据,无需向客户端同步,bizKey:{}", bizKey, e);} catch (RuntimeException e) {log.error("", e);} finally {lock.unlock();}log.info("[ws-chaos-log]向客户端刷新数据,完成 ==> sessionId : {}, bizKey : {}", session.getId(), bizKey);// 在处理过程中,可能又有新的事件,所以再次尝试刷新数据flushMessage(bizType, bizKey, false);}
}
stept5: 广播事件,全局监听
前后端建立连接的时候,绑定了后端一台机器,但是后台一般都是多台服务器,如果事件传递到其他服务器,那么已经建立的连接如何监听到并返回内呢?
这里使用了rocketmq的机制,每台机器都会监听到事件的变化,从而触发当前机器将变更内容发回到前端。
@Component
@RocketMQMessageListener(topic = "EXECUTE_FLOW_LOG", selectorExpression = "log", consumerGroup = "flow-log", messageModel = MessageModel.BROADCASTING)
@Slf4j
public class ChaosLogEventConsumer implements RocketMQListener<String> {@Autowired(required = false)private List<Consumer<ChaosLogEvent>> chaosLogEventConsumers = Collections.emptyList();@Overridepublic void onMessage(String message) {log.info("[MQ]receive ChaosLogEvent message:{}", message);ChaosLogEvent event = JsonUtils.fromJson(message, ChaosLogEvent.class);for (Consumer<ChaosLogEvent> consumer : chaosLogEventConsumers) {try {consumer.accept(event);} catch (RuntimeException e) {log.error("[MQ] failed consume ChaosLogEvent message,consumer:" + consumer.getClass(), e);}}}
}
前端代码实现
以react为例,仅供参考:
export const fetchExecuteLogs = (bizType: string, bizId: any, logsRef: any, setLogs: any) => {if (!bizType || !bizId) {console.log('fetchLogs: logContextToken or node is null')return}setLogs([])if (logsRef.current[0]) {console.log('close ws')logsRef.current[0].close()}let host = wsHost ? wsHost : window.location.hostlet protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'let client = new WebSocket(`${protocol}://${host}/ws/ark/chaos/execute/log/${bizType}/${bizId}`)logsRef.current = [client, []]// 报错的回调函数client.onerror = (e: any) => {console.log('Connection Error')console.log(e)}//链接打开的回调函数client.onopen = () => {console.log('WebSocket Client Connected')}//链接关闭的回调函数client.onclose = () => {console.log('echo-protocol Client Closed')}//收到消息的处理函数client.onmessage = (e: any) => {if (logsRef.current[0] === client) {if (typeof e.data === 'string') {let newLogs = [...logsRef.current[1], e.data]if (newLogs.length > 250) {newLogs = newLogs.slice(200)}setLogs(newLogs)logsRef.current = [client, newLogs]}} else {client.close()}}const sendPing = () => {if (logsRef.current[0] === client) {const data = { message: 'heartbeat' }client.send(JSON.stringify(data))setTimeout(sendPing, 10000)}}setTimeout(sendPing, 10000)
}
相关文章:
使用WebSocket实现log日志流的实时展示-从轮询到通知
场景介绍 最近开发一个系统,其中一个模块需要展示实时的执行过程,过程日志可能比较多。以前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单容易实现,但是比较消耗资源&#…...
UE5 从零开始制作跟随的大鹅
文章目录 二、绑定骨骼三、创建 ControlRig四、创建动画五、创建动画蓝图六、自动寻路七、生成 goose八、碰撞 和 Physics Asset缺点 # 一、下载模型 首先我们需要下载一个静态网格体,这里我们可以从 Sketchfab 中下载:Goose Low Poly - Download Free …...
O’Reilly
--江上往来人,但爱鲈鱼美。 --君看一叶舟,出没风波里。 OReilly OReilly出版社出版的技术类图书 俗称动物系列 应该是每个技术人员的必备手册。 OReilly动物系列(中译本) 简介" 动物系列作为 OReilly 书籍的典型代表被普遍…...
优盘驱动器未格式化:数据拯救行动指南
优盘困境:驱动器未格式化的挑战 在日常的数据存储与传输中,优盘以其便携性和高容量成为了我们不可或缺的伙伴。然而,当您尝试访问优盘时,突然弹出的“驱动器未被格式化”提示却如同晴天霹雳,让人措手不及。这一状况不…...
4.Handler mappings
处理程序映射 简介 在早期版本的 Spring 中,用户需要在 Web 应用程序上下文中定义一个或多个 HandlerMapping bean 以将传入的 Web 请求映射到适当的处理程序。随着注解控制器的引入,通常不再需要这样做,因为 RequestMappingHandlerMapping…...
《学会 SpringMVC 系列 · 消息转换器 MessageConverters》
📢 大家好,我是 【战神刘玉栋】,有10多年的研发经验,致力于前后端技术栈的知识沉淀和传播。 💗 🌻 CSDN入驻不久,希望大家多多支持,后续会继续提升文章质量,绝不滥竽充数…...
深度学习项目 -7-使用 Python 的手写数字识别
一、前言 该文章仅作为个人学习使用 二、正文 项目源代码:深度学习项目 - 使用 Python 进行手写数字识别 - DataFlair (data-flair.training) 数据集:https://drive.google.com/open?id1hJiOlxctFH3uL2yTqXU_1f6c0zLr8V_K Python 深…...
MySQL —— 库,数据类型 与 表
库与基础操作 1.1 查看数据库 使用 show databases; 可以查看当前 MySQL 目前有多少个数据库 5 rows 表示有 5 行,这里是表示的是有效的数据,不包括 第一行的指引 set 表示结果集合 0.01 sec 表示这个 sql 语句一共运行了0.01 秒,一般情况…...
Java重修笔记 第二十七天 匿名内部类
匿名内部类 1. 定义:无类名(底层自动分配类名“外部类名$1”),既是类也是对象,定义在外部类的局部位置,例如方法体和代码块中,通过new类或接口并在大括号里重写方法来实现。 2. 使用场景&…...
Nero Lens 智图 - 适用于 iOS 和 iPadOS 的专业图片处理 App
首先是手机端的无损放大 App:Nero Lens 智图,适用于 iOS 和 iPadOS,不仅可以放大,还有多种 AI 图片增强功能。 使用这款 App 可以通过 AI 模型智能放大可达 400%,还有老照片去划痕、上色,抠图移除背景、照…...
Nginx代理路径被吃
Nginx代理路径被吃的情况 日常工作中经常使用nginx反向代理一些资源,有时正常代理,发现代理不过去。 验证被吃调location情况 通过浏览器访问: https://zhao138969.com/LinuxPackage/Python/SelectDocker location /LinuxPackage { proxy…...
pytest-html报告修改与汉化
前言 Pytest框架可以使用两种测试报告,其中一种就是使用pytest-html插件生成的测试报告,但是报告中有一些信息没有什么用途或者显示的不太好看,还有一些我们想要在报告中展示的信息却没有,最近又有人问我pytest-html生成的报告&a…...
react-native从入门到实战系列教程一Swiper组件的使用及bug修复
轮播图,在app中随处可见,这么重要的功能我们怎么可能不学习下在react-native中的实现方式。 依然是第三方组件react-native-swiper 官网地址 https://www.npmjs.com/package/react-native-swiper 组件使用的组件及事件参考官方即可。 实现效果 官网…...
springboot开发的常用注解总结-配置组件类注解
Spring Boot 提供了许多注解,这些注解大大简化了 Spring 应用的配置和开发过程。以下是一些常见的 Spring Boot注解及其作用。 目录 配置组件类 (Configure Component )Configuration解释:Demo Code:更深度使用&#x…...
DataX 最新版本安装部署
1、下载 git clone gitgithub.com:alibaba/DataX.git 2、打包 mvn -U clean package assembly:assembly -Dmaven.test.skiptrue...
【架构】应用保护
这篇文章总结一下应用保护的手段。如今说到应用保护,更多的会想到阿里的sentinel,手段丰富,应用简单。sentinel的限流、降级、熔断,可以自己去试一下,sentinel主要通过配置实现功能,不难。sentinel的简介放…...
从核心到边界:六边形、洋葱与COLA架构的深度解析
文章目录 1 引言2 软件架构3 架构分类4 典型的应用架构4.1 分层架构4.2 CQRS4.3 六边形架构4.4 洋葱架构4.5 DDD 5 COLA架构设计5.1 分层设计5.2 扩展设计5.3 规范设计5.3.1 组件规范5.3.2 包规范5.3.3 命名规范 6 COLA架构总览7 小结 1 引言 软件的首要技术使命:管…...
04-Fastjson反序列化漏洞
免责声明 本文仅限于学习讨论与技术知识的分享,不得违反当地国家的法律法规。对于传播、利用文章中提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,本文作者不为此承担任何责任,一旦造成后果请自行承担&…...
ABC365(A-D)未补
A - Leap Year(模拟) 题意:给定一个数字n,如果n不是4的倍数,输出365;如果n是4的倍数但不是100的倍数,输出366;如果n是100的倍数但不是400的倍数,输出365;如果…...
Python用png生成不同尺寸的图标
Kimi生成 from PIL import Imagedef generate_icon(source_image_path, output_image_path, size):with Image.open(source_image_path) as img:# 转换图片为RGBA模式,确保有透明通道if img.mode ! RGBA:img img.convert(RGBA)# 调整图片大小到指定尺寸img img.r…...
1688中国站获得工厂档案信息 API
公共参数 名称类型必须描述keyString是免费申请调用key(必须以GET方式拼接在URL中)secretString是调用密钥api_nameString是API接口名称(包括在请求地址中)[item_search,item_get,item_search_shop等]cacheString否[yes,no]默认y…...
定时任务框架 xxl-job
🍓 简介:java系列技术分享(👉持续更新中…🔥) 🍓 初衷:一起学习、一起进步、坚持不懈 🍓 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正🙏 🍓 希望这篇文章对你有所帮助,欢…...
C/C++关键字大全
目录 一、const 二、static 三、#define 和 typedef 四、#define 和 inline 五、#define 和 const 六、new 和 malloc 七、const 和 constexpr 八、volatile 九、extern 十、前置 和后置 十一、atomic 十二、struct 和 class 一、const 1、const 关键字可用于定义…...
ROS2 Linux Mint 22 安装教程
前言: 本教程在Linux系统上使用。 一、linux安装 移动硬盘安装linux:[LinuxToGo教程]把ubuntu装进移动固态,随时随用以下是我建议安装linux mint版本的清单: 图吧工具箱:https://www.tbtool.cn/linux mint: https://…...
快速将网站从HTTP升级为HTTPS
在当今数字化的世界中,网络安全变的越来越重要,HTTPS(超文本传输安全协议)不仅能够提供加密的数据传输,还能增强用户信任度,提升搜索引擎排名,为网站带来多重益处。所以将网站从HTTP升级到HTTPS…...
Qt程序移植至Arm开发板
目录 1.工具准备: 系统调试工具SecureCRT 虚拟机安装linux(Ubuntu) 交叉编译工具链 ARM 端Qt 环境(Qt-5.7.1) 1) linux processor SD安装 2)交叉编译工具链配置 2.编译Qt工程: 2.0 交叉编译 依赖库源码,生成动…...
删除分区 全局索引 drop partition global index Statistics变化
1.不一定unusable,可以先删除data (index 再删除过程中会更新结构)再drop/truncate. ---------------------- CREATE TABLE interval_sale ( prod_id NUMBER(6) , cust_id NUMBER , time_id DATE ) PARTITION BY RANGE (time_i…...
git回退未commit、回退已commit、回退已push、合并某一次commit到另一个分支
文章目录 1、git回退未commit2、git回退已commit3、git回退已push的代码3.1 直接丢弃某一次的push3.2 撤销push后,不丢弃改动,重新修改后要再次push 4、合并某一次commit到另一个分支 整理几个工作上遇到的git问题。 1、git回退未commit git回退未comm…...
yolov8pose 部署rknn(rk3588)、部署地平线Horizon、部署TensorRT,部署工程难度小、模型推理速度快,DFL放后处理中
特别说明:参考官方开源的yolov8代码、瑞芯微官方文档、地平线的官方文档,如有侵权告知删,谢谢。 模型和完整仿真测试代码,放在github上参考链接 模型和代码。 之前写了yolov8、yolov8seg、yolov8obb 的 DFL 放在模型中和放在后处理…...
程序员找工作之操作系统面试题总结分析
程序员在找工作面试时,操作系统方面可能会被问到的问题涵盖了多个核心知识点和概念。以下是对这些面试问题的总结和分析: 1. 核心硬件与体系结构 微机的核心部件:询问微机硬件系统中最核心的部件是什么(CPU)。处理机…...
深圳市做网站公司/怎么做网站宣传
关于深度学习的框架之争一直都没停止过,每隔一阵大家就要进行一次框架大讨论: TensorFlow的使用者虽多,又有谷歌的背书,但真的很!难!用! Pytorch虽然入门简单、上手快,但因为开源时…...
哪个网站做代购/网站排行
最近看了下Nutch,目前Nutch最新版本2.3.1,支持Hbase、MongoDB等存儲,但在搭建和測試過程中發現對Mysql 的支持好像有點問題。后來將Nutch版本改為2.2.1。基於Nutch2.2.1Mysql 的環境配置過程如下:1.下載Nutch2.2.1 源碼࿱…...
wordpress.主题/关键词搜索指数
微服务架构是互联网很热门的话题,是互联网技术发展的必然结果。它提倡将单一应用程序划分成一组小的服务,服务之间互相协调、互相配合,为用户提供最终价值。虽然微服务架构没有公认的技术标准和规范或者草案,但业界已经有一些很有…...
wordpress win2012 r2/学做网站培训班要多少钱
手机QQ的聊天框很漂亮,包括好多短信交互框也做成类似的风格,各种效果,各种炫,至于不规则形状的那种(称为手绘风格),比较麻烦,这里使用CSS3新特性,border-radius,进行信息框交互内容的设计. border-radius样式用于制作圆角边框,通过像素,百分比等单位制定圆角的形状.如果希望单独…...
wordpress首页默认文件夹/seo整站优化方案
⭐⭐欢迎关注博客主页:https://blog.csdn.net/u013411339 ⭐⭐欢迎点赞 👍 收藏 ⭐留言 📝 ,欢迎留言交流! ⭐⭐本文由【王知无】原创,首发于 CSDN博客! ⭐⭐本文首发CSDN论坛,未经过官方和本人允许,严禁转载! 本文是对《【硬刚大数据之学习路线篇】从零到大数据专…...
苹果电脑做网站好用吗/郑州网站建设价格
本文实例讲述了php实现mysqli批量执行多条语句的方法。分享给大家供大家参考,具体如下:可以一次性的执行多个操作或取回多个结果集。实例:$mysqli new mysqli("localhost", "root", "111111", "test&quo…...