netty与websockt实现聊天
配置websockt:
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;/*** websocket配置*/
@Data
@Configuration
@ConfigurationProperties(prefix = "ws")
public class WsConfig {/*** websockt服务端口,不可和web服务同一个端口*/private Integer port=8779;/*** 心跳超时时间-单位-秒*/private Integer heartTimeout = 60;/*** 默认匹配的路径*/private String url="/";
}
返回实体载体:
import com.alibaba.fastjson2.JSON;
import lombok.Data;import java.io.Serializable;@Data
public class MsgBody implements Serializable {public enum MsgType{/*** 普通文字消息*/text,/*** 图片消息*/img,/*** 文件*/file,}public enum Type{/*** 自己*/self,/*** 别人*/other,}private Type type;private MsgType msgType;/*** 消息主体*/private String msgContent;public String toJson(){return JSON.toJSONString(this);}
}
import com.alibaba.fastjson2.JSON;import java.io.Serializable;
import java.util.LinkedHashMap;/*** websocket返回载体*/
public class WsBean extends LinkedHashMap<String,Object> implements Serializable {/*** 指定调用前端的回调函数*/public enum CallbackEm{/*** 通知回调函数*/notice,/*** 收到消息的回调*/receive_msg,}public WsBean(CallbackEm callbackEm,Object data) {super();this.put("code",callbackEm.name());this.put("data",data);}public static WsBean get(CallbackEm callbackEm){return new WsBean(callbackEm,"");}public static WsBean get(CallbackEm callbackEm,Object data){return new WsBean(callbackEm,data);}public WsBean setData(Object data){this.put("data",data);return this;}public WsBean set(String key,Object value){this.put(key,value);return this;}public String toJson(){return JSON.toJSONString(this);}}
netty通道:
import cn.hutool.core.map.BiMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** 这个类用来处理用户和连接的关联关系*/
@Slf4j
@Component
public class NioWebSocketChannelPool {/*** 用户保持连接*/private final DefaultChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 保持连接用户对应的长连接id,此为双向绑定*/private final BiMap<String, ChannelId> bindUserMap = new BiMap<>(new ConcurrentHashMap<>());/*** 新增一个客户端通道*/public void addChannel(Channel channel){channels.add(channel);}/*** 移除一个客户端通道* @param channel*/public void removeChannel(Channel channel){String mapKey = bindUserMap.getKey(channel.id());if (mapKey != null){bindUserMap.remove(mapKey);}channels.remove(channel);}/*** 绑定用户*/public void bindUser(String userId,Channel channel){bindUserMap.put(userId,channel.id());}/*** 向用户推送消息*/public void sendToUser(String userId,WsBean data){ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}}public void sendToUser(String userId,MsgBody data){ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}}public BiMap<String,ChannelId> getBindUserMap(){return bindUserMap;}/*** 群发推送消息*/public void writeAndFlush(WsBean data){Set<String> onlineIds = getBindUserMap().keySet();onlineIds.forEach(userId->{ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}});}}
netty处理类:
import cn.hutool.core.util.StrUtil;
import com.xx.framework.config.WsConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** 处理端*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class NioWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {@Resourceprivate WsConfig wsConfig;@Resourceprivate NioWebSocketChannelPool webSocketChannelPool;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客户端连接:{}",ctx.channel().id());webSocketChannelPool.addChannel(ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("客户端端口连接:{}",ctx.channel().id());webSocketChannelPool.removeChannel(ctx.channel());super.channelInactive(ctx);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.channel().flush();super.channelReadComplete(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("客户端请求数据类型:{}",msg.getClass());if (msg instanceof FullHttpRequest){fullHttpRequestHandler(ctx,(FullHttpRequest)msg);}super.channelRead(ctx, msg);}/*** 处理连接请求,客户端websockt发送握手包时会执行第一次请求* @param ctx* @param request*/private void fullHttpRequestHandler(ChannelHandlerContext ctx, FullHttpRequest request) {String uri = request.getUri();Map<String,String> params = getParams(uri);log.info("客户端请求参数:{}",params);/*** 判断请求路径是否跟配置中的一致*/if (wsConfig.getUrl().equals(getBasePath(uri))){/*** 因为有可能携带了参数,导致客户端一致无法返回握手包,因此在校验通过后,重置请求路径*/request.setUri(wsConfig.getUrl());}else{ctx.close();}String userId = params.get("user_id");if (StrUtil.isBlank(userId)){log.info("用户ID为空,无法登录");return;}webSocketChannelPool.bindUser(userId,ctx.channel());}/*** 获取URI中参数以外部分路径* @param uri* @return*/private String getBasePath(String uri) {if (uri == null || uri.isEmpty()){return null;}int idx = uri.indexOf("?");if (idx == -1){return uri;}return uri.substring(0,idx);}/*** 请路径参数转换成Map对象,如果路径参数出现重复参数名,将以最后的参数值为准* @param uri* @return*/private Map<String, String> getParams(String uri) {Map<String,String> params = new HashMap<>(10);int idx = uri.indexOf("?");if (idx != -1){String[] paramsArr = uri.substring(idx+1).split("&");for (String param:paramsArr){idx = param.indexOf("=");params.put(param.substring(0,idx),param.substring(idx+1));}}return params;}/*** 客户端发送断开请求处理*/private void closeWebSocketFrameHandler(ChannelHandlerContext ctx, CloseWebSocketFrame frame){ctx.close();}/*** 创建连接之后,客户端发送的消息都会在这里处理*/private void textWebSocketFrameHandler(ChannelHandlerContext ctx, TextWebSocketFrame frame){//客户端发送过来的内容不进行业务处理,原样返回,一般不做处理
// log.info("收到客户端信息-channelId: {}, 消息内容: {}", ctx.channel().id(), frame.text());}/*** 处理客户端心跳包*/private void pingWebSocketFrameHandler(ChannelHandlerContext ctx, PingWebSocketFrame frame){ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {//根据请求数量类型进厂分发处理if (webSocketFrame instanceof PingWebSocketFrame){PingWebSocketFrame pingWebSocketFrame = (PingWebSocketFrame) webSocketFrame;pingWebSocketFrameHandler(channelHandlerContext,pingWebSocketFrame);}else if (webSocketFrame instanceof TextWebSocketFrame){TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;textWebSocketFrameHandler(channelHandlerContext,textWebSocketFrame);}else if (webSocketFrame instanceof CloseWebSocketFrame){CloseWebSocketFrame closeWebSocketFrame = (CloseWebSocketFrame) webSocketFrame;closeWebSocketFrameHandler(channelHandlerContext,closeWebSocketFrame);}}
}
netty服务类:
package com.xx.framework.ws;import com.xx.framework.config.WsConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 服务端*/
@Slf4j
@Component
public class NioWebSocketServer implements InitializingBean , DisposableBean, Ordered {@Resourceprivate WsConfig wsConfig;@Resourceprivate NioWebSocketHandler nioWebSocketHandler;private EventLoopGroup bossGroup;private EventLoopGroup workGroup;private ChannelFuture channelFuture;@Overridepublic void destroy() throws Exception {log.info("shutting down netty server....");if (bossGroup != null){bossGroup.shutdownGracefully().sync();}if (workGroup != null){workGroup.shutdownGracefully().sync();}if (channelFuture != null){channelFuture.channel().closeFuture().syncUninterruptibly();}log.info("netty server shutdown");}@Overridepublic void afterPropertiesSet() throws Exception {try{bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.option(ChannelOption.SO_BACKLOG,1024).group(bossGroup,workGroup).channel(NioServerSocketChannel.class).localAddress(wsConfig.getPort()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new HttpServerCodec()).addLast(new ChunkedWriteHandler()).addLast(new HttpObjectAggregator(8192)).addLast(nioWebSocketHandler).addLast(new WebSocketServerProtocolHandler(wsConfig.getUrl(),null,true));}});channelFuture = serverBootstrap.bind().sync();}finally {if (channelFuture != null && channelFuture.isSuccess()){log.info("netty server startup on port:{} (websockt)with context path '{}'",wsConfig.getPort(),"/");}else{log.info("netty server startup failed");if (bossGroup != null){bossGroup.shutdownGracefully().sync();}if (workGroup != null){workGroup.shutdownGracefully().sync();}}}}@Overridepublic int getOrder() {return 0;}
}
以上代码serviceImpl应用:
/*** 新增聊天记录** @param chatRecord 聊天记录* @return 结果*/@Overridepublic int insertChatRecord(ChatRecord chatRecord) {MsgBody msgBody = new MsgBody();if (StrUtil.isNotBlank(chatRecord.getMsgType())){if (chatRecord.getMsgType().equals(MsgTypeEnum.TEXT.getMsgType())){msgBody.setMsgType(MsgBody.MsgType.text);}else if (chatRecord.getMsgType().equals(MsgTypeEnum.IMG.getMsgType())){msgBody.setMsgType(MsgBody.MsgType.img);}}msgBody.setMsgContent(chatRecord.getMsgContent());chatRecord.setMsgContent(JSON.toJSONString(msgBody));chatRecord.setId(IdUtils.getLongId());chatRecord.setCreateTime(DateUtils.getNowDate());int insert = chatRecordMapper.insertChatRecord(chatRecord);if(insert > 0){msgBody.setType(MsgBody.Type.self);//通知浏览器用户 webSocketChannelPool.sendToUser(chatRecord.getSendUserId().toString(),msgBody);msgBody.setType(MsgBody.Type.other);webSocketChannelPool.sendToUser(chatRecord.getReceiveUserId().toString(),msgBody);}return insert;}
相关文章:
netty与websockt实现聊天
配置websockt: import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;/*** websocket配置*/ Data Configuration ConfigurationProperties(prefix &qu…...
21.2 CSS 三大特性与页面布局
1. 开发者工具修改样式 使用开发者工具修改样式, 操作步骤如下: * 1. 打开开发者工具: 在浏览器中右键点击页面, 然后选择检查或者使用快捷键(一般是 F12 或者 CtrlShiftI)来打开开发者工具.* 2. 打开样式编辑器: 在开发者工具中, 找到选项卡或面板, 一般是Elements或者Elemen…...
MySQL 特殊语法时间格式以及Greadb连接
一、时间语法 DATE_FORMAT和to_char() select to_char(now(),%Y-%m-%d %H:%i:%s) from dual; select DATE_FORMAT(now(),%Y-%m-%d %H:%i:%s) from dual; 2.to_date() 和STR_TO_DATE(#{date},%Y-%m-%d ) select to_date(now(),yyyy-mm-dd hh24:mi:ss) from dual;...
Python(.pyc)反编译:pycdc工具安装与使用
本文将介绍如何将python的.pyc文件反编译成源码,以便我们对源码的学习与改进。pycdc工具安装 下载地址: 1、Github地址:https://github.com/zrax/pycdc ,下载后需要使用CMake进行编译。 2、已下载好及编译好的地址:ht…...
山西电力市场日前价格预测【2023-08-28】
日前价格预测 预测明日(2023-08-28)山西电力市场全天平均日前电价为319.70元/MWh。其中,最高日前电价为371.80元/MWh,预计出现在19: 15。最低日前电价为278.59元/MWh,预计出现在13: 00。 价差方向预测 1: …...
python3/pip3 SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed
环境: mac os 背景: 电脑之前安装的是python3.9 , 现在升级到python3.10。 从python官网下载macos版本的python3.10 pkg。 双击安装。 程序使用aiohttp访问ebay 。 出错: aiohttp.client_exceptions.ClientConnectorCertifi…...
Python中的迭代器与生成器
文章目录 1、迭代器2、生成器3、列表推导式和生成器表达式4、enumerate() 在Python中,迭代器(Iterator)和生成器(Generator)是两种用于处理可迭代对象的重要工具。而可迭代对象包括列表,元组,字…...
简单着色器编写(下)
函数部分介绍完了,最后来介绍一下main函数中的部分。 std::string vertexShader "#version 330 core\n" "\n" "layout(location0)in vec4 position;" "\n" "void main()\n" "{\n&…...
go并发编程基础
go并发编程 1waitgroup WaitGroup就是等待所有的goroutine全部执行完毕,add方式和Down方法要配套使用 package mainimport ("fmt""sync" )func main() {var wq sync.WaitGroupwq.Add(100) //监控多少个goroutine执行结束for i: 0;i<100;…...
PHP之 导入excel表格时,获取日期时间变成浮点数
读取到的时间 float(0.20833333333333) 原格式 15:00:00 代码 if (Request::isPost()) {$file_url input(upfile); // 本地上传文件地址// 读取文件内容$local_file_url __dir__./../../../public.$file_url;// $spreadsheet new Spreadsheet();// $sheet $spreadsheet-…...
学习 Java 报表技术导入 Maven 依赖出错:jacob 无法下载、jasperreports 依赖错误
发生缘由 最近在做一个可视化项目,用到了 Java 报表技术。在跟着「黑马」课程导入 pom.xml 文件的时候提示下载依赖错误。 com.jacob 包无法下载Failed to read artifact descriptor for com.lowagie:itext:jar:2.1.7.js6 运行环境 电脑系统版本:Win…...
力扣-哈希-最长连续序列
题目 给定一个未排序的整数数组 nums ,找出数字连续的最长序列(不要求序列元素在原数组中连续)的长度。 请你设计并实现时间复杂度为 O(n) 的算法解决此问题。 示例 1: **输入:**nums [100,4,200,1,3,2] **输出&a…...
Java线程 - 详解(1)
一,创建线程 方法一:继承Thread类 class MyThread extends Thread{Overridepublic void run() {System.out.println("线程1");} }public class Test {public static void main(String[] args) {MyThread myThread new MyThread();myThread.…...
结构体-C语言(初阶)
目录 一、结构体声明 1.1 结构概念 1.2 结构声明 1.3 结构成员的类型 1.4 结构体变量的定义和初始化 二、结构体成员的访问 2.1 结构体变量访问成员 2.2 结构体指针访问指向变量的成员 三、结构体传参 一、结构体声明 1.1 结构概念 结构是一些值的集合,这些值称为…...
【网络】HTTPS的加密
目录 第一组,非对称加密第二组,非对称加密第三组,对称加密证书签名 HTTPS使用的是非对称加密加对称加密的方案 (非对称加密:公钥加/解密,私钥解/加密) (对称加密:一组对称…...
Nacos安装指南
Nacos安装指南 1.Windows安装 开发阶段采用单机安装即可。 1.1.下载安装包 在Nacos的GitHub页面,提供有下载链接,可以下载编译好的Nacos服务端或者源代码: GitHub主页:https://github.com/alibaba/nacos GitHub的Release下载…...
java-Optional 类详解
目录 前言 Optional的构造方法 Optional的相关方法介绍 isPresent用法: get用法: filter用法: orElse用法: orElseGet用法 orElseThrow用法 map用法 flatMap用法: 前言 Optional 类是java8的新特性࿰…...
sql数据库怎么备份,sql 实时备份
在当今互联网时代,数据已经成为企业的核心资产。然而,数据的安全性和完整性面临硬件问题、软件故障、人工操作错误等各种威胁。为了保证数据的安全,实时备份已经成为公司必须采取的重要措施之一。下面我们就重点介绍SQL实时备份的重要实施方法…...
RK3399平台开发系列讲解(存储篇)Linux 存储系统的 I/O 栈
平台内核版本安卓版本RK3399Linux4.4Android7.1🚀返回专栏总目录 文章目录 一、Linux 存储系统全景二、Linux 存储系统的缓存沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将介绍 Linux 存储系统的 I/O 原理。 一、Linux 存储系统全景 我们可以把 Linux 存储系…...
Java“牵手”天猫淘口令转换API接口数据,天猫API接口申请指南
天猫平台商品淘口令接口是开放平台提供的一种API接口,通过调用API接口,开发者可以获取天猫商品的标题、价格、库存、商品快递费用,宝贝ID,发货地,区域ID,快递费用,月销量、总销量、库存、详情描…...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的
修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...
通过 Ansible 在 Windows 2022 上安装 IIS Web 服务器
拓扑结构 这是一个用于通过 Ansible 部署 IIS Web 服务器的实验室拓扑。 前提条件: 在被管理的节点上安装WinRm 准备一张自签名的证书 开放防火墙入站tcp 5985 5986端口 准备自签名证书 PS C:\Users\azureuser> $cert New-SelfSignedCertificate -DnsName &…...
mac:大模型系列测试
0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何,是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试,是可以跑通文章里面的代码。训练速度也是很快的。 注意…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...
