go 实现websocket以及详细设计流程过程,确保通俗易懂
websocket简介:
WebSocket 是一种网络传输协议,可在单个 TCP 连接上进行全双工通信,位于 OSI 模型的应用层。WebSocket 协议在 2011 年由 IETF 标准化为 RFC 6455,后由 RFC 7936 补充规范。
WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。
理解各种协议和通信层、套接字的含义
IP:网络层协议;(高速公路)
TCP和UDP:传输层协议;(卡车)
HTTP:应用层协议;(货物)。HTTP(超文本传输协议)是建立在TCP协议之上的一种应用。HTTP连接最显著的特点是客户端发送的每次请求都需要服务器回送响应,在请求结束后,会主动释放连接。从建立连接到关闭连接的过程称为“一次连接”。
SOCKET:套接字,TCP/IP网络的API。(港口码头/车站)Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。socket是在应用层和传输层之间的一个抽象层,它把TCP/IP层复杂的操作抽象为几个简单的接口供应用层调用已实现进程在网络中通信。
Websocket:同HTTP一样也是应用层的协议,但是它是一种双向通信协议,是建立在TCP之上的,解决了服务器与客户端全双工通信的问题,包含两部分:一部分是“握手”,一部分是“数据传输”。握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。
*注:什么是单工、半双工、全工通信?
数据只能单向传送为单工;
数据能双向传送但不能同时双向传送称为半双工;
数据能够同时双向传送则称为全双工。
上面是简单的接受了websocket情况以及和其他协议的区别及联系,在做之前,还是要了解下这块,对后期实战有帮助。
websocket开源选择:
在go语言中,websocket组件比较多的,可以到go仓库搜索下:
今天以仓库使用最多一个开源框架(gorilla)进行实战落地,以及讲解整个过程细节
websocket (github.com/gorilla/websocket)
上面介绍完整体开源情况。
业务背景:
1、数据实时推送到前端进行图形化显示
2、报警数据需要实时推送各端,例如web、cliet、安卓、ios等等其他客户端
我前几年一直从事的是java,可以看看我的播客,基本上与java有关,go也是最近两三周学习的,我其他播客有具体说明,因为公司业务需要,所有就简单的学了一下go语言,作为刚接触go不久的技术人员,如何面对新技术进行探索和落地的,大家可以跟着我的思路进行学习模仿,这样后期学习稍微比较快一些。
实战:
1、在实战之前,我们先看看官网使用说明:
这是最简单的,方式,没有其他业务,我们如何进行最佳实现呢?还是要看官网:
传送门:websocket/examples/chat at main · gorilla/websocket · GitHub
具体源码不说了,但是里面有几个重要点已经出现了,也是我们要学习的思想,这也是为什么要看开源代码的原因,要学习他们的思想和编码技巧。
稍微解释下Hub结构体里的字段:
// Registered clients. // 这是保存客户端连接的信息,map,从这里可以看出来,所有的客户端都要保存到这里,这时可以想到,后期保存到redis里,从这里进行扩展即可。clients map[*Client]bool// Inbound messages from the clients. // 这个就是广播数据了,但是demo了用了字节链,目的是了并行执行,提高效率,字节目的是接收所有情况的数据broadcast chan []byte// Register requests from the clients.
// 这个比较好理解了,是新客户端进行连接时触发的链条,为什么走链,也是为了并行执行,也就是异步执行register chan *Client// Unregister requests from clients.
//这个就是取消注册,也就是关闭客户端连接unregister chan *Client
其实这几个字段已经把我们的框架整体搭建起来了,clients负责存储客户端,broadcast负责服务端发送给客户端数据的,register负责用来监听新建连接,unregister负责关闭客户端连接的
其实websocket也就是干这个事情的,例如websocket服务端收集所有客户端,然后根据需要进行发送消息给客户,然后就是关闭,大致流程就是这样的。
在看客户端怎么进行封装的:
这个就是针对上面的Hub进行组装结构体实例的,这里就不介绍了,本次实战也是根据这个来的,大家看懂这个基本上后续其他开源的websocket都没啥大问题。
接下来真正进行项目实战:
1、第一步创建websocket的客户端管理结构体:
// clientManager
// @Description: 客户端管理者
type clientManager struct {//客户端存储的地方,我这边是用map进行存储,这块可以放到redis上,也是可以的,根据情况扩展即可//这里可以使用map[string]*client,也是可以的,如果这样设计,方便后期进行匹配比较简单,直接匹配key即可,这种方式也是可以的,匹配客户端对象里的keyclients map[*client]bool//广播数据链,进行业务限制,如果没有业务限制,直接使用[]byte 比较通用;用chan进行异步处理broadcast chan model.BusinessDataWrapper//客户端注册链;用chan进行异步处理register chan *client//客户端关闭链;用chan进行异步处理unregister chan *client
}
结构体首字母小写,目的不用暴露出去,用于内部使用即可;每个字段不解释了,上面注释已经写好了,其实和官网是一样的。
2、客户端结构体:
// client
// @Description: 客户端信息
type client struct {//每个客户端连接后都要进行生成唯一key,因为业务场景需求,不同的用户或设备接受的数据要一一对应,后期这块要会做权限控制key string//客户端连接对象socket *websocket.Conn//数据发送链send chan []byte
}
3、编写启动方法,其实官网写的很像,稍微进行重构,更加符合当前项目
func (m *clientManager) start(callBackFunc func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool) {for {select {case client := <-m.register: //进行连接m.clients[client] = truemsg := "有一个新连接出现,已连接成功,客户端key:" + client.keylog.Info(ctx, msg)//发送数据,这块可以不用发送,等后续进行注释即可//m.send([]byte(msg), client)case client := <-m.unregister: //关闭连接,进行释放资源if _, ok := m.clients[client]; ok {close(client.send)delete(m.clients, client)msg := "客户端key:" + client.key + ",已关闭"log.Info(ctx, msg)//m.send([]byte(msg), client)}case businessDataWrapper := <-m.broadcast: //广播数据for client := range m.clients {//这里其实就是发送数据了,//进行转换成json字符串//businessDataJsonStr, _ := json.Marshal(businessDataWrapper)//broadCastSend(m, client, businessDataJsonStr)//进行回调,我这里面目的为了后期这块不在调整代码了,在启动时进行业务处理,方便后期扩展用的,如果毕竟简单,直接使用broadCastSend(m, client, businessDataJsonStr)进行推送信息callBackFunc(client, m, businessDataWrapper)}}}
}// 广播进行发送数据
func broadCastSend(manager *clientManager, client *client, businessDataJsonByte []byte) {select {case client.send <- businessDataJsonByte:default:fmt.Println("关闭连接了,,,,,")close(client.send)delete(manager.clients, client)}
}
4、某一个客户端群发给其他客户端,排除自己客户端
//发送数据,这快类似群发消息
func (m *clientManager) send(message []byte, ignore *client) {for client := range m.clients {//ignore,这是忽略本身客户端,因为这是群发消息,自己可以不用接收了if client != ignore {//将数据写入到通道链client.send <- message}}
}
5、推送数据,第四步是写入到发送通道里,还没真正发送,这块就是真正推送到客户端机制:
func (c *client) write(manager clientManager) {defer func() {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "客户端进行关闭")}()for {select {//如果客户端有数据要进行写出去case message, ok := <-c.send:if !ok {c.socket.WriteMessage(websocket.CloseMessage, []byte{})log.Info(ctx, c.key, "发送关闭提示")return}//这里才是真正的把数据推送到客户端err := c.socket.WriteMessage(websocket.TextMessage, message)if err != nil {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "数据写入失败,进行关闭!")break}}}
}
6、接收客户端发送的数据
func (c *client) read(manager clientManager) {defer func() {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "客户端进行关闭")}()for {_, message, err := c.socket.ReadMessage()if err != nil {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "读数据出现异常,直接关闭。")break}//后期可以注释掉log.Info(ctx, c.key, "接收到客户端发送的数据", string(message))//读到数据,进行业务操作,目前我这边项目只需要推送到客户端即可,所以暂时不做业务了,其他需要做业务,这里做个监听即可}
}
7、提供创建客户端管理函数
// 创建客户端管理器
func newClientManager() *clientManager {return &clientManager{//广播数据,model.BusinessDataWrapper是我具体业务数据,可以换成[]byte接收broadcast: make(chan model.BusinessDataWrapper),register: make(chan *client),unregister: make(chan *client),clients: make(map[*client]bool),}
}
以上就是整体的封装处理,可以看到和官方的demo很像,只是结合了一些业务场景而已,其他的都一样的。
接下来进行和业务进行集成:
1、创建管理器,上面也说了有两个业务场景, 一个是原始数据推送 ,另一个是报警数据推送,所以创建两个管理器出来:
// 原始ws客户端管理器
var rowDataManagerNew = newClientManager()// 报警数据ws客户端管理器
var alarmDataManagerNew = newClientManager()
2、我们注册路由上,通过上面也能推断,需要指定两个路由路径
// 初始化websocket协议配置
var upgrader = websocket.Upgrader{ReadBufferSize: 1024,WriteBufferSize: 1024,CheckOrigin: func(r *http.Request) bool { return true }, //允许跨域 、 允许同源
}// registerRawDataClientConn
//
// @Author zhaosy
// @Description: 注册原始数据客户端连接
// @date 2024-07-16 18:12:19
func registerRawDataClientConn(w http.ResponseWriter, r *http.Request, businessType string, businessId string, userName string) {if lang.IsEmpty(businessType) {io.WriteString(w, "businessType 不能为空")}//生成客户端conn, err := upgrader.Upgrade(w, r, nil)if err != nil {log.Error(ctx, err.Error(), err)io.WriteString(w, "这是一个websocket连接,不是API.")return}clientId := guid.S()
//这个key是我随机生成的一个key,包含了一些业务,大家根据需要进行设置,也可以随机生成就行,就是一行字符串,如果对key没有要求,其实key不用处理也行哈key := websocketRowDataCachePrefix(businessType, businessId, userName, clientId)//初始化客户端对象client := &client{key: key,socket: conn,send: make(chan []byte),}rowDataManagerNew.register <- client//开启读go client.read(*rowDataManagerNew)//开起写go client.write(*rowDataManagerNew)}// registerAlarmClient
//
// @Author zhaosy
// @Description: 注册报警客户端连接
// @date 2024-07-16 19:40:52
func registerAlarmClientConn(w http.ResponseWriter, r *http.Request) {//生成客户端conn, err := upgrader.Upgrade(w, r, nil)if err != nil {log.Error(ctx, err.Error(), err)io.WriteString(w, "这是一个websocket,不是API.")return}clientId := guid.S()
//这个key是我随机生成的一个key,包含了一些业务,大家根据需要进行设置,也可以随机生成就行,就是一行字符串,如果对key没有要求,其实key不用处理也行哈key := websocketAlarmCachePrefix(clientId)//初始化客户端对象client := &client{key: key,socket: conn,send: make(chan []byte),}alarmDataManagerNew.register <- client//开启读go client.read(*alarmDataManagerNew)//开启写go client.write(*alarmDataManagerNew)}
上面看到
w http.ResponseWriter, r *http.Request 这两个参数应该就知道怎么做了吧,直接绑定到路由路由,也是官网那种方式
这是注册到go的http路由上了,后续通过path路径进行访问即可。
3、如何与我们的业务数据进行绑定?
还需要提供包函数出去
// SendRowDataBusinessData
//
// @Author zhaosy
// @Description: 接收业务数据进行推送到websocket
// @date 2024-07-16 18:19:46
func SendRowDataBusinessData(data model.BusinessData) {rowDataManagerNew.broadcast <- model.SetRowDataWsWrapper(data)
}// SendAlarmBusinessData
//
// @Author zhaosy
// @Description: 接收报警数据,推送到websocket客户端,这是我项目的业务,大家换成string即可
// @date 2024-07-16 19:42:13
func SendAlarmBusinessData(data model.AlarmBusinessData) {alarmDataManagerNew.broadcast <- model.SetAlarmWsWrapper(data)
}
仅供参考。
这里是业务数据推送websocket的入口:
4、最后一步是管理器要启动了,启动前,大家知道我写了 回调函数,要进行实现下,具体业务了,所以大家参考即可:
func init() {//启动原始数据websocketgo rowDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {if consts.ZERO == businessDataWrapper.DataType { //原始数据,推送businessData := businessDataWrapper.BusinessDataif businessData.BusinessId == "" {//进行转换成json字符串businessDataJsonStr, _ := json.Marshal(businessData)//广播所有客户端broadCastSend(manager, c, businessDataJsonStr)return true}//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的if strings.Contains(c.key, websocketRowDataCachePrefix(businessData.BusinessType, businessData.BusinessId, "", "")) {//进行转换成json字符串businessDataJsonStr, _ := json.Marshal(businessData)//广播指定客户端broadCastSend(manager, c, businessDataJsonStr)return true}return false}return false})//启动报警数据推送go alarmDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {// 报警数据,推送if consts.ONE == businessDataWrapper.DataType {alarmBusinessData := businessDataWrapper.AlarmBusinessDataif alarmBusinessData.BusinessId != "" {//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的if strings.Contains(c.key, websocketAlarmCachePrefix("")) {//进行转换成json字符串alarmBusinessDataJsonStr, _ := json.Marshal(alarmBusinessData)//广播指定客户端broadCastSend(manager, c, alarmBusinessDataJsonStr)return true}}}return false})
}
这样就可以了,启动整体就没问题了,我这边用的是goframe框架,所以我单独提供了这两个:
func NewWs() *webSocket {return &webSocket{}
}type webSocket struct {
}// RawDataWSHandle
//
// @Author zhaosy
// @Description: 原始数据websocket
// @date 2024-07-16 15:00:04
func (w *webSocket) RawDataWSHandle(r *ghttp.Request) {//获取参数businessType := r.Get("businessType")businessId := r.Get("businessId")userName := r.Get("userName")registerRawDataClientConn(r.Response.BufferWriter, r.Request, businessType.String(), businessId.String(), userName.String())
}// AlarmWSHandle
//
// @Author zhaosy
// @Description: 报警websocket处理器
// @date 2024-07-16 19:43:57
func (w *webSocket) AlarmWSHandle(r *ghttp.Request) {registerAlarmClientConn(r.Response.BufferWriter, r.Request)
}
在goframe里cmd里进行绑定:
//websocket--原始数据websocket推送s.BindHandler("/ws/rowdata/{businessType}/{businessId}/{userName}", websocket.NewWs().RawDataWSHandle)//websocket--报警数据websocket推送s.BindHandler("/ws/alarm", websocket.NewWs().AlarmWSHandle)
5、进行测试:
启动正常,日志也输出来了,进行测试
上面是连接正常,
通过业务数据进行测试:
以上就是本次研究的结果,整体上go的websocket比较简单,后面有机会,会重新进行重构,重构单独封装可以随意使用。
发一个整的代码:
// Package websocket
// @Author zhaosy
// @Date 2024/7/16 下午2:52:00
// @Desc websocket相关
package websocketimport ("context""encoding/json""fmt""github.com/gogf/gf/v2/frame/g""github.com/gogf/gf/v2/net/ghttp""github.com/gogf/gf/v2/util/guid""github.com/gorilla/websocket""io""net/http""skynet/internal/consts""skynet/internal/model""skynet/utility/lang""strings"
)var (ctx = context.TODO()log = g.Log()
)func init() {//启动原始数据websocketgo rowDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {if consts.ZERO == businessDataWrapper.DataType { //原始数据,推送businessData := businessDataWrapper.BusinessDataif businessData.BusinessId == "" {//进行转换成json字符串businessDataJsonStr, _ := json.Marshal(businessData)//广播所有客户端broadCastSend(manager, c, businessDataJsonStr)return true}//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的if strings.Contains(c.key, websocketRowDataCachePrefix(businessData.BusinessType, businessData.BusinessId, "", "")) {//进行转换成json字符串businessDataJsonStr, _ := json.Marshal(businessData)//广播指定客户端broadCastSend(manager, c, businessDataJsonStr)return true}return false}return false})//启动报警数据推送go alarmDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {// 报警数据,推送if consts.ONE == businessDataWrapper.DataType {alarmBusinessData := businessDataWrapper.AlarmBusinessDataif alarmBusinessData.BusinessId != "" {//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的if strings.Contains(c.key, websocketAlarmCachePrefix("")) {//进行转换成json字符串alarmBusinessDataJsonStr, _ := json.Marshal(alarmBusinessData)//广播指定客户端broadCastSend(manager, c, alarmBusinessDataJsonStr)return true}}}return false})
}func NewWs() *webSocket {return &webSocket{}
}// websocketRowDataCachePrefix
//
// @Author zhaosy
// @Description: 原始数据缓存前缀
// @date 2024-07-16 18:15:55
func websocketRowDataCachePrefix(businessType, businessId, userName, clientId string) string {key := "websocket:rowdata"if lang.IsNotEmpty(businessType) {key = key + ":" + businessTypeif lang.IsNotEmpty(businessId) {key = key + ":" + businessIdif lang.IsNotEmpty(userName) {key = key + ":" + userName}}}if lang.IsNotEmpty(clientId) {key = key + ":" + clientId}return key
}// websocketAlarmCachePrefix
//
// @Author zhaosy
// @Description: 报警数据前缀
// @date 2024-07-16 19:36:24
func websocketAlarmCachePrefix(clientId string) string {//后期要加组织机构,有权限控制这块,需要进行处理,暂时先不去处理key := "websocket:alarm"if lang.IsNotEmpty(clientId) {key = key + ":" + clientId}return key
}type webSocket struct {
}// RawDataWSHandle
//
// @Author zhaosy
// @Description: 原始数据websocket
// @date 2024-07-16 15:00:04
func (w *webSocket) RawDataWSHandle(r *ghttp.Request) {//获取参数businessType := r.Get("businessType")businessId := r.Get("businessId")userName := r.Get("userName")registerRawDataClientConn(r.Response.BufferWriter, r.Request, businessType.String(), businessId.String(), userName.String())
}// AlarmWSHandle
//
// @Author zhaosy
// @Description: 报警websocket处理器
// @date 2024-07-16 19:43:57
func (w *webSocket) AlarmWSHandle(r *ghttp.Request) {registerAlarmClientConn(r.Response.BufferWriter, r.Request)
}// SendRowDataBusinessData
//
// @Author zhaosy
// @Description: 接收业务数据进行推送到websocket
// @date 2024-07-16 18:19:46
func SendRowDataBusinessData(data model.BusinessData) {rowDataManagerNew.broadcast <- model.SetRowDataWsWrapper(data)
}// SendAlarmBusinessData
//
// @Author zhaosy
// @Description: 接收报警数据,推送到websocket客户端
// @date 2024-07-16 19:42:13
func SendAlarmBusinessData(data model.AlarmBusinessData) {alarmDataManagerNew.broadcast <- model.SetAlarmWsWrapper(data)
}// 初始化websocket协议配置
var upgrader = websocket.Upgrader{ReadBufferSize: 1024,WriteBufferSize: 1024,CheckOrigin: func(r *http.Request) bool { return true }, //允许跨域 、 允许同源
}// registerRawDataClientConn
//
// @Author zhaosy
// @Description: 注册原始数据客户端连接
// @date 2024-07-16 18:12:19
func registerRawDataClientConn(w http.ResponseWriter, r *http.Request, businessType string, businessId string, userName string) {if lang.IsEmpty(businessType) {io.WriteString(w, "businessType 不能为空")}//生成客户端conn, err := upgrader.Upgrade(w, r, nil)if err != nil {log.Error(ctx, err.Error(), err)io.WriteString(w, "这是一个websocket连接,不是API.")return}clientId := guid.S()key := websocketRowDataCachePrefix(businessType, businessId, userName, clientId)//初始化客户端对象client := &client{key: key,socket: conn,send: make(chan []byte),}rowDataManagerNew.register <- client//开启读go client.read(*rowDataManagerNew)//开起写go client.write(*rowDataManagerNew)}// registerAlarmClient
//
// @Author zhaosy
// @Description: 注册报警客户端连接
// @date 2024-07-16 19:40:52
func registerAlarmClientConn(w http.ResponseWriter, r *http.Request) {//生成客户端conn, err := upgrader.Upgrade(w, r, nil)if err != nil {log.Error(ctx, err.Error(), err)io.WriteString(w, "这是一个websocket,不是网站.")return}clientId := guid.S()key := websocketAlarmCachePrefix(clientId)//初始化客户端对象client := &client{key: key,socket: conn,send: make(chan []byte),}alarmDataManagerNew.register <- client//开启读go client.read(*alarmDataManagerNew)//开启写go client.write(*alarmDataManagerNew)}// 原始ws客户端管理器
var rowDataManagerNew = newClientManager()// 报警数据ws客户端管理器
var alarmDataManagerNew = newClientManager()// **********************************以下是websocket进行封装,可以直接使用******************************************// 创建客户端管理器
func newClientManager() *clientManager {return &clientManager{broadcast: make(chan model.BusinessDataWrapper),register: make(chan *client),unregister: make(chan *client),clients: make(map[*client]bool),}
}// clientManager
// @Description: 客户端管理者
type clientManager struct {//客户端存储的地方,我这边是用map进行存储,这块可以放到redis上,也是可以的,根据情况扩展即可//这里可以使用map[string]*client,也是可以的,如果这样设计,方便后期进行匹配比较简单,直接匹配key即可,这种方式也是可以的,匹配客户端对象里的keyclients map[*client]bool//广播数据链,进行业务限制,如果没有业务限制,直接使用[]byte 比较通用;用chan进行异步处理broadcast chan model.BusinessDataWrapper//客户端注册链;用chan进行异步处理register chan *client//客户端关闭链;用chan进行异步处理unregister chan *client
}// client
// @Description: 客户端信息
type client struct {//每个客户端连接后都要进行生成唯一key,因为业务场景需求,不同的用户或设备接受的数据要一一对应,后期这块要会做权限控制key string//客户端连接对象socket *websocket.Conn//数据发送链send chan []byte
}// start
//
// @Author zhaosy
// @Description: websocket启动
// @date 2024-07-17 10:55:28
func (m *clientManager) start(callBackFunc func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool) {for {select {case client := <-m.register: //进行连接m.clients[client] = truemsg := "有一个新连接出现,已连接成功,客户端key:" + client.keylog.Info(ctx, msg)//发送数据,这块可以不用发送,等后续进行注释即可//m.send([]byte(msg), client)case client := <-m.unregister: //关闭连接,进行释放资源if _, ok := m.clients[client]; ok {close(client.send)delete(m.clients, client)msg := "客户端key:" + client.key + ",已关闭"log.Info(ctx, msg)//m.send([]byte(msg), client)}case businessDataWrapper := <-m.broadcast: //广播数据for client := range m.clients {//这里其实就是发送数据了,//进行转换成json字符串//businessDataJsonStr, _ := json.Marshal(businessDataWrapper)//broadCastSend(m, client, businessDataJsonStr)//进行回调,我这里面目的为了后期这块不在调整代码了,在启动时进行业务处理,方便后期扩展用的,如果毕竟简单,直接使用broadCastSend(m, client, businessDataJsonStr)进行推送信息callBackFunc(client, m, businessDataWrapper)}}}
}// 广播进行发送数据
func broadCastSend(manager *clientManager, client *client, businessDataJsonByte []byte) {select {case client.send <- businessDataJsonByte:default:fmt.Println("关闭连接了,,,,,")close(client.send)delete(manager.clients, client)}
}// send
//
// @Author zhaosy
// @Description: 这快类似群发消息
// @date 2024-07-16 16:40:37
func (m *clientManager) send(message []byte, ignore *client) {for client := range m.clients {//ignore,这是忽略本身客户端,因为这是群发消息,自己可以不用接收了if client != ignore {//将数据写入到通道链client.send <- message}}
}func (c *client) read(manager clientManager) {defer func() {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "客户端进行关闭")}()for {_, message, err := c.socket.ReadMessage()if err != nil {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "读数据出现异常,直接关闭。")break}//后期可以注释掉log.Info(ctx, c.key, "接收到客户端发送的数据", string(message))//读到数据,进行业务操作,目前我这边项目只需要推送到客户端即可,所以暂时不做业务了,其他需要做业务,这里做个监听即可}
}// write
//
// @Author zhaosy
// @Description: 写入数据
// @date 2024-07-16 16:52:47
func (c *client) write(manager clientManager) {defer func() {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "客户端进行关闭")}()for {select {//如果客户端有数据要进行写出去case message, ok := <-c.send:if !ok {c.socket.WriteMessage(websocket.CloseMessage, []byte{})log.Info(ctx, c.key, "发送关闭提示")return}//这里才是真正的把数据推送到客户端err := c.socket.WriteMessage(websocket.TextMessage, message)if err != nil {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "数据写入失败,进行关闭!")break}}}
}
ok。结束
相关文章:
go 实现websocket以及详细设计流程过程,确保通俗易懂
websocket简介: WebSocket 是一种网络传输协议,可在单个 TCP 连接上进行全双工通信,位于 OSI 模型的应用层。WebSocket 协议在 2011 年由 IETF 标准化为 RFC 6455,后由 RFC 7936 补充规范。 WebSocket 使得客户端和服务器之间的数…...
记录工作中遇到的关于更新丢失商品超开的一个坑
场景: 工作中使用MybatisPlus以及Oracle进行数据库操作,收到RocketMQ消息开始并发分摊不同清货单的商品的批次,并对商品更新冻结数量。 上线后频繁出现商品超库存开票问题。(还好是内部业务,人工替换批次记账即可&…...
形状之美:WebKit中CSS形状的实现与创新
形状之美:WebKit中CSS形状的实现与创新 在网页设计的世界里,CSS形状(Shapes)是一种革命性的特性,它允许开发者使用几何形状来创建复杂的布局结构。WebKit,作为现代浏览器的核心引擎之一,对CSS形…...
项目管理进阶之RACI矩阵
前言 项目管理进阶系列续新篇。 RACI?这个是什么矩阵,有什么用途? 在项目管理过程中,如Team规模超5以上时,则有必要采用科学的管理方式,满足工作需要。否则可能事倍功半。 Q:什么是RACI矩阵 …...
docker: No space left on device处理与迁移目录
简介:工作中当遇到Docker容器内部的磁盘空间已满。可能的原因包括日志文件过大、临时文件过多或者是Docker容器的存储卷已满,需要我们及时清理相关文件,并对docker的路径进行迁移。 历史攻略: centos:清理磁盘空间 …...
设计模式使用场景实现示例及优缺点(结构型模式——外观模式)
在一个繁忙而复杂的城市中,有一座名为“技术森林”的巨大图书馆。这座图书馆里藏着各种各样的知识宝典,从古老的卷轴到现的电子书籍,无所不包。但是,图书馆之所以得名“技术森林”,是因为它的结构异常复杂,…...
Artix7系列FPGA实现SDI视频编解码+UDP以太网传输,基于GTP高速接口,提供工程源码和技术支持
目录 1、前言工程概述免责声明 2、相关方案推荐本博已有的 SDI 编解码方案本博已有的以太网方案本博已有的FPGA图像缩放方案本方案的缩放应用本方案在Xilinx--Kintex系列FPGA上的应用本方案在Xilinx--Zynq系列FPGA上的应用 3、详细设计方案设计原理框图SDI 输入设备Gv8601a 均衡…...
加拿大上市药品查询-加拿大药品数据库
在加拿大,药品的安全性、有效性和质量是受到严格监管的。根据《食品药品法案》的规定,所有药品制造商必须提供充分的科学证据,证明其产品的安全性和有效性。为此,加拿大卫生部建立了一个全面的药品数据库 (DPD) &#…...
qt自定义控件(QLabel)
先创建自定义控件类painter_label 1.自定义类必须给基类传入父窗口指针 2.重写控件中的方法 3.在UI中创建一个QLabel,右键“提升为”,输入类名...
阿里云国际站:海外视频安全的DRM加密
随着科技的进步,视频以直播或录播的形式陆续开展海外市场,从而也衍生出内容安全的问题,阿里云在这方面提供了完善的内容安全保护机制,适用于不同的场景,如在视频安全提供DRM加密。 由图可以了解到阿里云保护直播安全的…...
【Apache Doris】周FAQ集锦:第 15 期
【Apache Doris】周FAQ集锦:第 15 期 SQL问题数据操作问题运维常见问题其它问题关于社区 欢迎查阅本周的 Apache Doris 社区 FAQ 栏目! 在这个栏目中,每周将筛选社区反馈的热门问题和话题,重点回答并进行深入探讨。旨在为广大用户…...
verilog实现ram16*8 (vivado)
module ram_16x2 (input clk, // 时钟信号input we, // 写使能input en, // 使能信号input [3:0] addr, // 地址线input [1:0] datain, // 输入数据线output reg [1:0] dataout // 输出数据线 );// 定义存储器数组reg [1:0] mem [15:0];always (posedge…...
框架使用及下载
Bootstrap5 安装使用 | 菜鸟教程 (runoob.com) https://github.com/twbs/bootstrap/releases/download/v5.1.3/bootstrap-5.1.3-dist.zip(下载链接) Staticfile CDN(html的所有框架合集) 直接在w3cschool里面看参考文件进行搜索自…...
通用图形处理器设计GPGPU基础与架构(四)
一、前言 本文将介绍GPGPU中线程束的调度方案、记分牌方案和线程块的分配与调度方案。 二、线程束调度 在计算机中有很多资源,既可以是虚拟的计算资源,如线程、进程或数据流,也可以是硬件资源,如处理器、网络连接或 ALU 单元。调…...
会Excel就会sql?
如果你熟悉Excel,理解SQL(结构化查询语言,Structured Query Language)会相对容易,因为它们在某些功能上有着相似之处。SQL主要用于管理和操作数据库中的数据,而Excel则是电子表格软件,用于数据的组织、分析和可视化。下面我会用Excel的视角来帮你理解SQL的基本概念。 数…...
MyBatis-Plus的几种常见用法
MyBatis-Plus 提供了丰富的高级用法,可以简化开发,提高效率。以下是一些常见的可能会被忽略的用法示例。 1. 乐观锁 乐观锁用于避免在并发环境下数据更新冲突。MyBatis-Plus 通过注解和版本字段实现乐观锁。 示例: 在实体类中添加版本字段…...
【LeetCode】day15:110 - 平衡二叉树, 257 - 二叉树的所有路径, 404 - 左叶子之和, 222 - 完全二叉树的节点个数
LeetCode 代码随想录跟练 Day15 110.平衡二叉树257.二叉树的所有路径404.左叶子之和222.完全二叉树的节点个数 110.平衡二叉树 题目描述: 给定一个二叉树,判断它是否是 平衡二叉树 平衡二叉树的定义是,对于树中的每个节点,其左右…...
【网络安全的神秘世界】Error:Archives directory /var/cache/apt/archives/partial is missing.
🌝博客主页:泥菩萨 💖专栏:Linux探索之旅 | 网络安全的神秘世界 | 专接本 | 每天学会一个渗透测试工具 ✨问题描述 在kali中想要安装beef-xss软件包时,发生如下报错: Error: Archives directory /var/cac…...
网络编程中的TCP和UDP
什么是TCP协议 TCP( Transmission control protocol )即传输控制协议,是一种面向连接、可靠的数据传输协议,它是为了在不可靠的互联网上提供可靠的端到端字节流而专门设计的一个传输协议。 面向连接 :数据传输之前客户端和服务器端必须建立连…...
基于python的时空地理加权回归(GTWR)模型
一、时空地理加权回归(GTWR)模型 时空地理加权回归(GTWR)模型是由美国科罗拉多州立大学的Andy Liaw、Stanley A. Fiel和Michael E. Bock于2008年提出的一种高级空间统计分析方法。它是在传统地理加权回归(GWR…...
什么是单例模式,有哪些应用?
目录 一、定义 二、应用场景 三、6种实现方式 1、懒汉式,线程不安全。 2、懒汉式,线程安全 3、双检锁/双重校验锁(DCL,即 double-checked locking) 4、静态内部类方式-------只适用于静态域 5、饿汉式 6、枚举…...
adb命令操作手机各种开关
打开iqoo手机热点设置 adb shell am start -n com.android.settings/com.android.settings.Settings$\VivoTetherSettingsActivity蓝牙模块 检查蓝牙状态的ADB命令 检查蓝牙开关状态 adb shell settings get global bluetooth_on开启和关闭蓝牙 使用Intent操作蓝牙…...
【分布式存储系统HDFS】架构和使用
分布式存储系统HDFS:架构和使用 目录 引言HDFS简介HDFS的架构 NameNodeDataNodeSecondary NameNode HDFS的工作原理 数据读写流程数据冗余与恢复 HDFS的安装和配置 环境准备HDFS安装步骤HDFS配置文件启动HDFS HDFS的使用 基本命令HDFS Shell操作Java API操作 HDFS…...
Linux 实验一Linux系统安装
一、实验日期与地址 1、实验日期:2024年 2 月28 日 2、实验地址:S1-504 二、实验目的 1、掌握VMware Workstation建立虚拟机 2、掌握虚拟机环境下安装Centos 7 三、实验环境 VMware Workstation、Centos 7 四、实验内容 1、安装VMware Workstat…...
【人工智能】深度剖析AI伦理:强化隐私防线,推动算法公平性的核心议题
文章目录 🍊1 人工智能兴起背后的伦理及道德风险1.1 算法偏见与歧视1.2 数据隐私侵权1.3 透明度受限1.4 决策失衡1.5 AI生成内容的危险性 🍊2 建构AIGC伦理观:实现人机共创的永续提升2.1 技术手段与伦理预防2.2 即时警告与紧急关停措施2.3 法…...
如何解决微服务下引起的 分布式事务问题
一、什么是分布式事务? 虽然叫分布式事务,但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下,产生的事务,也就是分布式事务。 跨数据源的分布式事务 跨服务的分布式事务 二、解决方…...
牛客周赛50轮+cf955+abc363
D-小红的因式分解_牛客周赛 Round 50 (nowcoder.com) 思路: 巨蠢的题目,ax^2bxca1*a2*x^2(b1*a2b2*a1)xb1*b2,即: aa1*a2,ba1*b2a2*b1,cb1*b2 数据范围很小,直接暴力枚举吧(注意条件) 代码…...
【MySQL】:对库和表的基本操作方法
数据库使用的介绍 什么是SQL 学习数据库的使用——>基于 SQL编程语言 来对数据库进行操作 重点表述的是“需求”,期望得到什么结果。(至于结果是如何得到的,并不关键,都是数据库服务器在背后做好了) 重点表述的是…...
Library not found for -lstdc++.6.0.9
解决方案一 由于项目已经很多年了,前段时间更新了Xcode发现编译报错lstdc这个库很早以前就被舍弃了,但是一个项目的维护都随着解决bug堆砌出来的,这也导致了我们的项目走上了这条路。 比如 Library not found for -lstdc.6.0.9 报的错&#x…...
防火墙之双机热备篇
为什么要在防火墙上配置双机热备技术呢? 相信大家都知道,为了提高可靠性,避免单点故障 肯定有聪明的小伙伴会想到那为什么不直接多配置两台防火墙,然后再将他们进行线路冗余,不就完成备份了吗? 答案是不…...
蚌埠市做网站/宁波专业seo服务
文章目录一、编写项目代码二、编写Azkaban必须文件三、使用Azkaban执行项目代码一、编写项目代码 package Azkaban;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arr…...
行业网站作用/营销是做什么
docker的网络Docker 安装时会自动在host上创建三个网络,我们可用 docker network ls命令查看:[rootlocalhost ~]# docker network lsNETWORK ID NAME DRIVER SCOPE0164da7ee66a bridge bridge…...
web技术的网站开发/服务推广软文
机械迷城手机免费版完整版下载手游是一款拥有十分独特的铅笔画风制作的冒险解密手机游戏。游戏讲述了一个需要不断对付各种黑帽黑帮的小机器的故事。这里的许多角色都是机器人设计的,所以你需要拯救你的女朋友。与此同时,你将不断地与更多的坏人打交道&a…...
wordpress网站发布/百度新闻最新消息
词法作用域之外执行函数并可以访问所在词法作用域 当函数可以记住并访问所在的词法作用域,即使函数是在当前词法作用域之外执行,这时 就产生了闭包。 如果没能认出闭包,也不了解它的工作原理,在使用它的过程中就很容易犯错&#…...
汕头市住建局/seo外包软件
比较明显的网络流最小割模型,对于这种模型我们需要先求获利的和,然后减去代价即可。 我们对于第i个人来说, 如果选他,会耗费A[I]的代价,那么(source,i,a[i])代表选他之后的代价,如果不选他,我们…...
wordpress 获取时间/私人网站服务器
题目链接 长度为\(i\)的不降子序列个数是可以DP求的。 用\(f[i][j]\)表示长度为\(i\),结尾元素为\(a_j\)的不降子序列个数。转移为\(f[i][j]\sum f[i-1][k]\),其中\(k\)满足\(k<j\)且\(a_k\leq a_j\),可以用树状数组\(O(n^2\log n)\)解决。…...