Go WebSocket入门+千万级别弹幕系统架构设计
Go实现WebSocket(千万级别弹幕系统架构设计)
1 websocket简介(基于HTTP协议的长连接)
使用WebSocket可以轻松的维持服务器端长连接,其次WebSocket是架构在HTTP协议之上的,并且也可以使用HTTPS方式,因此WebSocket是可靠传输,并且不需要开发者关注底层细节。
- websocket具体细节:
①Upgrade:浏览器告知服务器升级为WebSocket协议
②Switch:服务器升级成功后会返回101状态码
③Communicate:浏览器和服务器就可以以WebSocket格式发送数据
- 还有一种推送数据的方式是SSE:
①SSE(Server Send Event):服务器单项推送消息,text/event-stream,它是一种流,可以返回多次数据
②使用场景:CI/CD,ChatGPT回答问题
详细文章:推送数据— —WebSocket与SSE
2 弹幕业务的技术选择(推、拉模式)
2.1 客户端拉(服务器压力过大,类似DDoS)
如果是客户端拉取服务器端数据,那么将会存在以下几个问题:
- 直播在线人数多就意味着消息数据更新频率高,拉取消息意味着弹幕无法满足时效性
- 如果很多客户端同时拉取,那么服务器端的压力无异于DDOS
- 一个弹幕系统应该是通用的,因此对于直播间弹幕较少的场景,意味着消息数据拉取请求都是无效的
2.2 服务端推(服务端需要维护大量长连接)
推送模式:当数据发生更新的时候服务器端主动推送到客户端,这样可以有效减少客户端的请求次数。
- 如果需要实现消息推送,那么就意味着服务器端维护大量的长连接。
3 技术实现(go)
🎆完整代码:
- go实现websocket:
https://github.com/ziyifast/ziyifast-code_instruction/tree/main/go-demo/go-websocket/1-simple - go实现简易弹幕系统:
https://github.com/ziyifast/ziyifast-code_instruction/tree/main/go-demo/go-websocket/2-boardcast
其他教程:Java实现简易聊天室
3.1 前端页面
index.html:
<!DOCTYPE html>
<html>
<head><title>go websocket</title><meta charset="utf-8"/>
</head>
<body>
<script type="text/javascript">var wsUri = "ws://127.0.0.1:7777/ws";var output;function init() {output = document.getElementById("output");testWebSocket();}function testWebSocket() {websocket = new WebSocket(wsUri);websocket.onopen = function (evt) {onOpen(evt)};websocket.onclose = function (evt) {onClose(evt)};websocket.onmessage = function (evt) {onMessage(evt)};websocket.onerror = function (evt) {onError(evt)};}function onOpen(evt) {// writeToScreen("CONNECTED");// doSend("WebSocket rocks");}function onClose(evt) {writeToScreen("DISCONNECTED");}function onMessage(evt) {var message = evt.data;if (message.startsWith("CONNECTED ")) {var connectionId = message.substring("CONNECTED ".length);writeToScreen("CONNECTED: " + connectionId);} else {writeToScreen('<span style="color: blue;">RESPONSE: ' + message + '</span>');}}// function onMessage(evt) {// writeToScreen('<span style="color: blue;">RESPONSE: '+ evt.data+'</span>');// // websocket.close();// }function onError(evt) {writeToScreen('<span style="color: red;">ERROR:</span> ' + evt.data);}function doSend(message) {// writeToScreen("SENT: " + message);websocket.send(message);}function writeToScreen(message) {var pre = document.createElement("p");pre.style.wordWrap = "break-word";pre.innerHTML = message;output.appendChild(pre);}window.addEventListener("load", init, false);function sendBtnClick() {var msg = document.getElementById("input").value;doSend(msg);document.getElementById("input").value = '';}function closeBtnClick() {websocket.close();}
</script>
<h2>WebSocket Test</h2>
<input type="text" id="input"></input>
<button onclick="sendBtnClick()">send</button>
<button onclick="closeBtnClick()">close</button>
<div id="output"></div></body>
</html>
3.2 go-websocket实现
1. model/connection.go:封装websocket连接
整体思路:
1. 封装websocket连接为connection
①维护连接的读写channel
②分别启两个协程for循环,一个用于读,一个用于写
//中间多了一层Channel,保证了线程安全
readLoop -> inChannel -> c.ReadMessage拿到data -> c.WriteMessage(data) -> outChannel -> writeLoop从outChannel中拿到data写回同样的数据到对端
2. conn_mgr:实现connection的管理,一旦有消息发送过来,便广播给其他连接,实现弹幕效果
package modelimport ("errors""github.com/google/uuid""github.com/gorilla/websocket""sync"
)/*整体思路:1. 维护连接的读写channel2. 分别启两个协程for循环,一个用于读,一个用于写//中间多了一层Channel,保证了线程安全readLoop -> inChannel -> c.ReadMessage拿到data -> c.WriteMessage(data) -> outChannel -> writeLoop
*/type Connection struct {ConnID stringConn *websocket.Conn// 读消息队列inChannel chan []byte//写消息队列outChannel chan []byte// 监听Channel是否关闭closeChan chan byte// 标识isClosed boollock sync.Mutex
}// InitConnection 初始化封装的conn
func InitConnection(conn *websocket.Conn) (c *Connection, err error) {connId, err := uuid.NewUUID()if err != nil {return nil, err}c = &Connection{ConnID: connId.String(),Conn: conn,inChannel: make(chan []byte, 1000),outChannel: make(chan []byte, 1000),closeChan: make(chan byte),isClosed: false,}//启动协程读取消息go c.readLoop()go c.writeLoop()return c, nil
}// ReadMessage 读取消息,从inChannel中读取数据(channel保证线程安全,阻塞读取)
func (c *Connection) ReadMessage() (data []byte, err error) {//从inChannel读取数据for {select {case data = <-c.inChannel:return data, nil//监听连接关闭信号,避免一直阻塞读取数据case <-c.closeChan:return nil, errors.New("conn is closed")}}
}// WriteMessage 写消息,将数据写入outChannel(channel保证线程安全,等待write loop从outChannel中获取数据写回连接)
func (c *Connection) WriteMessage(data []byte) (err error) {for {select {case c.outChannel <- data:return nilcase <-c.closeChan:return errors.New("conn is closed")}}
}// 从连接中不断读取数据写入inChannel
func (c *Connection) readLoop() {var (data []byteerr error)for {if _, data, err = c.Conn.ReadMessage(); err != nil {//读取数据失败,关闭连接c.Close()return}select {//读取到数据写到inChannelcase c.inChannel <- data:case <-c.closeChan:c.Close()}}
}// 从outChannel中不断读取数据并发送数据写回对端
func (c *Connection) writeLoop() {var (data []byteerr error)for {select {case data = <-c.outChannel:if err = c.Conn.WriteMessage(websocket.TextMessage, data); err != nil {c.Close()return}case <-c.closeChan:c.Close()return}}
}func (c *Connection) Close() {c.Conn.Close()c.lock.Lock()if !c.isClosed {close(c.closeChan)c.isClosed = true}WebSocketMgr.RemoveConnection(c)c.lock.Unlock()
}
2. model/conn_mgr.go
package modelimport ("fmt""sync"
)type connectionMgr struct {connections map[string]*Connectionlock sync.RWMutex
}var WebSocketMgr = &connectionMgr{connections: make(map[string]*Connection),lock: sync.RWMutex{},
}func (cm *connectionMgr) AddConnection(conn *Connection) {cm.lock.Lock()defer cm.lock.Unlock()cm.connections[conn.ConnID] = connfmt.Printf("connection %s added\n", conn.ConnID)return
}func (cm *connectionMgr) RemoveConnection(conn *Connection) {cm.lock.Lock()defer cm.lock.Unlock()delete(cm.connections, conn.ConnID)fmt.Printf("connection %s removed\n", conn.ConnID)return
}func (cm *connectionMgr) GetConnection(connID string) (conn *Connection, err error) {cm.lock.RLock()defer cm.lock.RUnlock()conn, ok := cm.connections[connID]if !ok {err = fmt.Errorf("connection not found")return}return
}func (cm *connectionMgr) Boardcast(data []byte) {cm.lock.RLock()defer cm.lock.RUnlock()for _, conn := range cm.connections {if err := conn.WriteMessage(data); err != nil {//if err := conn.WriteMessage([]byte(fmt.Sprintf("[%s] %s", conn.ConnID, string(data)))); err != nil {//TODO 补救或者日志记录,或者忽略return}}
}
3. main.go
package mainimport ("github.com/gorilla/websocket""log""myTest/demo_home/go-demo/go-websocket/2-boardcast/model""net/http"
)var (upgrader = websocket.Upgrader{//允许跨域CheckOrigin: func(r *http.Request) bool {return true}}
)func main() {//模拟简易弹幕系统,注意:为了逻辑简洁,并没有做过多的封装,部分代码设计以及安全监测并不合理http.HandleFunc("/ws", wsHandler)http.ListenAndServe(":7777", nil)
}func wsHandler(w http.ResponseWriter, r *http.Request) {var (conn *websocket.Connconnection *model.Connectionerr errordata []byte)if conn, err = upgrader.Upgrade(w, r, nil); err != nil {return}//初始化连接if connection, err = model.InitConnection(conn); err != nil {return}//注册连接model.WebSocketMgr.AddConnection(connection)// 发送连接ID给前端if err := conn.WriteMessage(websocket.TextMessage, []byte("CONNECTED "+connection.ConnID)); err != nil {log.Println("Error sending connection ID:", err)return}go func(c *model.Connection) {for {if data, err = connection.ReadMessage(); err != nil {return}//广播消息model.WebSocketMgr.Boardcast(data)}}(connection)
}
3.3 效果
1. 启动websocket服务端
2. 分别用chrome、Firefox、edge打开页面,建立websocket连接
CONNECTIONID用于标识不同的websocket长连接
- chrome

- Firefox

- edge

3. 不同浏览器相当于不同用户,chrome用户发起一个弹幕,点击send发送弹幕

4. 观察其他用户是否接受到弹幕


自己也会收到自己发的弹幕:

5. edge、firefox用户分别发一个弹幕,观察效果
模拟其他用户发送弹幕



6. chrome退出直播间,其他用户发送弹幕,它接受不到
chrome用户退出直播间,edge、firefox发送弹幕,chrome用户应该接受不到

edge、firefox用户发送弹幕:



后端系统日志:

4 千万级别弹幕系统设计
4.1 难点(内核、锁、CPU瓶颈)
- 内核瓶颈:
- 推送量大:100w在线用户*10条/s = 1000w条/s
- linux内核发送TCP的极限包频约为100w条/s
- 锁瓶颈:
- 推模式需要维护一个存储了100w条数据的集合,比如map
- 推送消息遍历整个集合,顺序发送消息,耗时极长
- 推送期间,客户端仍可以正常上/下线,所以集合需要加锁
- CPU瓶颈
- 浏览器与服务端采用json格式通讯
- json编码非常耗cpu
- 向100w在线用户推送1次,就需要100w次的json encode
4.2 解决方案(多小包合为一个大包)
- 内核瓶颈:
- 减少网络小包的发送
- 将同一秒内的N条消息合并为1条,合并后每秒推送次数只等于在线连接数
- 锁瓶颈:
- 将连接分散到多个集合中,每个集合都有自己的锁
- 多线程并发推送多个集合,避免锁竞争
- 读写锁取代互斥锁,多个推送任务可以遍历相同集合
- cpu瓶颈:
- 减少重复计算,json编码前置:1次消息编码+100w次推送
4.3 分布式架构
如果是单机架构的话:
- 维护海量的连接必然会耗费很多内存
- 消息推送的瞬间也会消耗大量CPU资源
- 消息推送瞬间带宽可能高达400-600MB,4-6Gbits(主要瓶颈,即需要万兆网卡)
因此我们需要分布式架构:
- 网关集群:维护websocket长连接
- 逻辑集群:基于HTTP/2向gateway网关集群分发消息(rpc),与其他服务的交互等
- 业务方
业务方->逻辑集群->网关集群
参考:https://learnku.com/articles/48418
相关文章:
Go WebSocket入门+千万级别弹幕系统架构设计
Go实现WebSocket(千万级别弹幕系统架构设计) 1 websocket简介(基于HTTP协议的长连接) 使用WebSocket可以轻松的维持服务器端长连接,其次WebSocket是架构在HTTP协议之上的,并且也可以使用HTTPS方式,因此WebSocket是可靠…...
uniapp使用伪元素实现气泡
uniapp使用伪元素实现气泡 背景实现思路代码实现尾巴 背景 气泡效果在开发中使用是非常常见的,使用场景有提示框,对话框等等,今天我们使用css来实现气泡效果。老规矩,先看下效果图: 实现思路 其实实现这个气泡框的…...
字节跳动:从梦想之芽到参天大树
字节跳动掌舵人:张一鸣 2012年:梦想的起点:在一个阳光明媚的早晨,北京的一座普通公寓里,一位名叫张一鸣的年轻人坐在电脑前,眼中闪烁着坚定的光芒。他的心中有一个梦想——通过技术改变世界,让…...
组合数学、圆排列、离散数学多重集合笔记
自用 如果能帮到您,那也值得高兴 知识点 离散数学经典题目 多重集合组合 补充容斥原理公式 隔板法题目 全排列题目:...
网络技术原理需要解决的5个问题
解决世界上任意两台设备时如何通讯的?? 第一个问题,pc1和pc3是怎么通讯的? 这俩属于同一个网段,那么同网段的是怎么通讯的? pc1和pc2属于不同的网段,第二个问题,不同网段的设备是…...
【数据结构】链表的大概认识及单链表的实现
目录 一、链表的概念及结构 二、链表的分类 三、单链表的实现 建立链表的节点: 尾插——尾删: 头插——头删: 查找: 指定位置之后删除——插入: 指定位置之前插入——删除指定位置: 销毁链表&am…...
国企:2024年6月中国移动相关招聘信息 二
在线营销服务中心-中国移动通信有限公司在线营销服务中心 硬件工程师 工作地点:河南省-郑州市 发布时间 :2024-06-18 截至时间: 2024-06-30 学历要求:本科及以上 招聘人数:1人 工作经验:3年 岗位描述 1.负责公司拾音器等音视频智能硬件产品全过程管理,包括但…...
Elasticsearch:智能 RAG,获取周围分块(二)
在之前的文章 “Elasticsearch:智能 RAG,获取周围分块(一) ” 里,它介绍了如何实现智能 RAG,获取周围分块。在那个文章里有一个 notebook。为了方便在本地部署的开发者能够顺利的运行那里的 notebook。在本…...
华为---RIP路由协议的汇总
8.3 RIP路由协议的汇总 8.3.1 原理概述 当网络中路由器的路由条目非常多时,可以通过路由汇总(又称路由汇聚或路由聚合)来减少路由条目数,加快路由收敛时间和增强网络稳定性。路由汇总的原理是,同一个自然网段内的不同子网的路由在向外(其他…...
Python基础——字符串常见用法:切片、去空格、替换、拼接
文章目录 专栏导读1、拼接字符串2、获取字符串长度3、字符串切片4、字符串替换:5、字符串分割6、字符串查找7、字符串大小写转换8、字符串去除空白9、字符串格式化:10、字符串编码与解码:11、字符串判断12、字符串填充与对齐总结 专栏导读 &a…...
LeetCode.51N皇后详解
问题描述 按照国际象棋的规则,皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上,并且使皇后彼此之间不能相互攻击。 给你一个整数 n ,返回所有不同的 n 皇后问题 的解决方案…...
计算机网络之奇偶校验码和CRC冗余校验码
今天我们来看看有关于计算机网络的知识——奇偶校验码和CRC冗余校验码,这两种检测编码的方式相信大家在计算机组成原理当中也有所耳闻,所以今天我就来跟大家分享有关他们的知识。 奇偶校验码 奇偶校验码是通过增加冗余位使得码字中1的个数恒为奇数或偶数…...
二叉树经典OJ练习
个人主页:C忠实粉丝 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C忠实粉丝 原创 二叉树经典OJ练习 收录于专栏【数据结构初阶】 本专栏旨在分享学习数据结构学习的一点学习笔记,欢迎大家在评论区交流讨论💌 目录 前置说…...
【OpenHarmony4.1 之 U-Boot 2024.07源码深度解析】008 - make distclean 命令解析
【OpenHarmony4.1 之 U-Boot 2024.07源码深度解析】008 - make distclean 命令解析 一、make V=1 distclean 命令解析系列文章汇总:《【OpenHarmony4.1 之 U-Boot 源码深度解析】000 - 文章链接汇总》 本文链接:《【OpenHarmony4.1 之 U-Boot 2024.07源码深度解析】008 - mak…...
QTreeView双击任意列展开
一.效果 二.原理 重点是如何通过其他列的QModelIndex(假设为index),获取第一列的QModelIndex(假设为firstColumnIndex)。代码如下所示: QModelIndex firstColumnIndex = model->index(index.row(), 0, index.parent()); 这里要注意index函数的第三个参数,第三个参…...
Linux入门攻坚——26、Web Service基础知识与httpd配置-2
http协议 URL:Uniform Resource Locator,统一资源定位符 URL方案:scheme,如http://,https:// 服务器地址:IP:port 资源路径: 示例:http://www.test.com:80/bbs/…...
相由心生与事出反常必有妖
从端午节之日生病起,已就医三次,快半个月了。医检的结论是老病复发—— 上呼吸道感染 。原本并无大碍,加之“水不在深,有龙则灵”的张龙医生处方得当,现已病情好转。只是“800727”趁人之危,兴灾乐祸地欲从…...
微信小程序---支付
一、判断是否登录 如果没有登录,走前端登录流程,不再赘述 二、获取订单编号 跟自己的后端商议入参,然后获取订单编号 三、通过订单编号获取wx.requestPayment()需要的参数 获取订单编号再次请求后端接口,拿到wx.requestPayme…...
Git学习2 -- VSCode中的Git
看了下,主要的插件有3个。自带的Source Control。第1个是Gitlens,第2个是Git Graph。第三个还有个git history。 首先是Source Control。界面大概是这样的。 还是挺直观的。在第一栏source control,可以进行基本的git操作。主要的git操作都是…...
VC++支持断点续下或续传的功能
VC使用多线程和Socket实现断点续下 一、断点续下的基本原理: 1.断点续传的理解可以分为两部分:一部分是断点,一部分是续传。断点的由来是在下载过程中,将一个下载文件分成了多个部分,同时进行多个部分一起的下载&…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/
使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题:docker pull 失败 网络不同,需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
WPF八大法则:告别模态窗口卡顿
⚙️ 核心问题:阻塞式模态窗口的缺陷 原始代码中ShowDialog()会阻塞UI线程,导致后续逻辑无法执行: var result modalWindow.ShowDialog(); // 线程阻塞 ProcessResult(result); // 必须等待窗口关闭根本问题:…...
MFE(微前端) Module Federation:Webpack.config.js文件中每个属性的含义解释
以Module Federation 插件详为例,Webpack.config.js它可能的配置和含义如下: 前言 Module Federation 的Webpack.config.js核心配置包括: name filename(定义应用标识) remotes(引用远程模块࿰…...
五子棋测试用例
一.项目背景 1.1 项目简介 传统棋类文化的推广 五子棋是一种古老的棋类游戏,有着深厚的文化底蕴。通过将五子棋制作成网页游戏,可以让更多的人了解和接触到这一传统棋类文化。无论是国内还是国外的玩家,都可以通过网页五子棋感受到东方棋类…...
基于单片机的宠物屋智能系统设计与实现(论文+源码)
本设计基于单片机的宠物屋智能系统核心是实现对宠物生活环境及状态的智能管理。系统以单片机为中枢,连接红外测温传感器,可实时精准捕捉宠物体温变化,以便及时发现健康异常;水位检测传感器时刻监测饮用水余量,防止宠物…...

