netty与websockt实现聊天
配置websockt:
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;/*** websocket配置*/
@Data
@Configuration
@ConfigurationProperties(prefix = "ws")
public class WsConfig {/*** websockt服务端口,不可和web服务同一个端口*/private Integer port=8779;/*** 心跳超时时间-单位-秒*/private Integer heartTimeout = 60;/*** 默认匹配的路径*/private String url="/";
}
返回实体载体:
import com.alibaba.fastjson2.JSON;
import lombok.Data;import java.io.Serializable;@Data
public class MsgBody implements Serializable {public enum MsgType{/*** 普通文字消息*/text,/*** 图片消息*/img,/*** 文件*/file,}public enum Type{/*** 自己*/self,/*** 别人*/other,}private Type type;private MsgType msgType;/*** 消息主体*/private String msgContent;public String toJson(){return JSON.toJSONString(this);}
}
import com.alibaba.fastjson2.JSON;import java.io.Serializable;
import java.util.LinkedHashMap;/*** websocket返回载体*/
public class WsBean extends LinkedHashMap<String,Object> implements Serializable {/*** 指定调用前端的回调函数*/public enum CallbackEm{/*** 通知回调函数*/notice,/*** 收到消息的回调*/receive_msg,}public WsBean(CallbackEm callbackEm,Object data) {super();this.put("code",callbackEm.name());this.put("data",data);}public static WsBean get(CallbackEm callbackEm){return new WsBean(callbackEm,"");}public static WsBean get(CallbackEm callbackEm,Object data){return new WsBean(callbackEm,data);}public WsBean setData(Object data){this.put("data",data);return this;}public WsBean set(String key,Object value){this.put(key,value);return this;}public String toJson(){return JSON.toJSONString(this);}}
netty通道:
import cn.hutool.core.map.BiMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** 这个类用来处理用户和连接的关联关系*/
@Slf4j
@Component
public class NioWebSocketChannelPool {/*** 用户保持连接*/private final DefaultChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 保持连接用户对应的长连接id,此为双向绑定*/private final BiMap<String, ChannelId> bindUserMap = new BiMap<>(new ConcurrentHashMap<>());/*** 新增一个客户端通道*/public void addChannel(Channel channel){channels.add(channel);}/*** 移除一个客户端通道* @param channel*/public void removeChannel(Channel channel){String mapKey = bindUserMap.getKey(channel.id());if (mapKey != null){bindUserMap.remove(mapKey);}channels.remove(channel);}/*** 绑定用户*/public void bindUser(String userId,Channel channel){bindUserMap.put(userId,channel.id());}/*** 向用户推送消息*/public void sendToUser(String userId,WsBean data){ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}}public void sendToUser(String userId,MsgBody data){ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}}public BiMap<String,ChannelId> getBindUserMap(){return bindUserMap;}/*** 群发推送消息*/public void writeAndFlush(WsBean data){Set<String> onlineIds = getBindUserMap().keySet();onlineIds.forEach(userId->{ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}});}}
netty处理类:
import cn.hutool.core.util.StrUtil;
import com.xx.framework.config.WsConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** 处理端*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class NioWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {@Resourceprivate WsConfig wsConfig;@Resourceprivate NioWebSocketChannelPool webSocketChannelPool;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客户端连接:{}",ctx.channel().id());webSocketChannelPool.addChannel(ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("客户端端口连接:{}",ctx.channel().id());webSocketChannelPool.removeChannel(ctx.channel());super.channelInactive(ctx);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.channel().flush();super.channelReadComplete(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("客户端请求数据类型:{}",msg.getClass());if (msg instanceof FullHttpRequest){fullHttpRequestHandler(ctx,(FullHttpRequest)msg);}super.channelRead(ctx, msg);}/*** 处理连接请求,客户端websockt发送握手包时会执行第一次请求* @param ctx* @param request*/private void fullHttpRequestHandler(ChannelHandlerContext ctx, FullHttpRequest request) {String uri = request.getUri();Map<String,String> params = getParams(uri);log.info("客户端请求参数:{}",params);/*** 判断请求路径是否跟配置中的一致*/if (wsConfig.getUrl().equals(getBasePath(uri))){/*** 因为有可能携带了参数,导致客户端一致无法返回握手包,因此在校验通过后,重置请求路径*/request.setUri(wsConfig.getUrl());}else{ctx.close();}String userId = params.get("user_id");if (StrUtil.isBlank(userId)){log.info("用户ID为空,无法登录");return;}webSocketChannelPool.bindUser(userId,ctx.channel());}/*** 获取URI中参数以外部分路径* @param uri* @return*/private String getBasePath(String uri) {if (uri == null || uri.isEmpty()){return null;}int idx = uri.indexOf("?");if (idx == -1){return uri;}return uri.substring(0,idx);}/*** 请路径参数转换成Map对象,如果路径参数出现重复参数名,将以最后的参数值为准* @param uri* @return*/private Map<String, String> getParams(String uri) {Map<String,String> params = new HashMap<>(10);int idx = uri.indexOf("?");if (idx != -1){String[] paramsArr = uri.substring(idx+1).split("&");for (String param:paramsArr){idx = param.indexOf("=");params.put(param.substring(0,idx),param.substring(idx+1));}}return params;}/*** 客户端发送断开请求处理*/private void closeWebSocketFrameHandler(ChannelHandlerContext ctx, CloseWebSocketFrame frame){ctx.close();}/*** 创建连接之后,客户端发送的消息都会在这里处理*/private void textWebSocketFrameHandler(ChannelHandlerContext ctx, TextWebSocketFrame frame){//客户端发送过来的内容不进行业务处理,原样返回,一般不做处理
// log.info("收到客户端信息-channelId: {}, 消息内容: {}", ctx.channel().id(), frame.text());}/*** 处理客户端心跳包*/private void pingWebSocketFrameHandler(ChannelHandlerContext ctx, PingWebSocketFrame frame){ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {//根据请求数量类型进厂分发处理if (webSocketFrame instanceof PingWebSocketFrame){PingWebSocketFrame pingWebSocketFrame = (PingWebSocketFrame) webSocketFrame;pingWebSocketFrameHandler(channelHandlerContext,pingWebSocketFrame);}else if (webSocketFrame instanceof TextWebSocketFrame){TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;textWebSocketFrameHandler(channelHandlerContext,textWebSocketFrame);}else if (webSocketFrame instanceof CloseWebSocketFrame){CloseWebSocketFrame closeWebSocketFrame = (CloseWebSocketFrame) webSocketFrame;closeWebSocketFrameHandler(channelHandlerContext,closeWebSocketFrame);}}
}
netty服务类:
package com.xx.framework.ws;import com.xx.framework.config.WsConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 服务端*/
@Slf4j
@Component
public class NioWebSocketServer implements InitializingBean , DisposableBean, Ordered {@Resourceprivate WsConfig wsConfig;@Resourceprivate NioWebSocketHandler nioWebSocketHandler;private EventLoopGroup bossGroup;private EventLoopGroup workGroup;private ChannelFuture channelFuture;@Overridepublic void destroy() throws Exception {log.info("shutting down netty server....");if (bossGroup != null){bossGroup.shutdownGracefully().sync();}if (workGroup != null){workGroup.shutdownGracefully().sync();}if (channelFuture != null){channelFuture.channel().closeFuture().syncUninterruptibly();}log.info("netty server shutdown");}@Overridepublic void afterPropertiesSet() throws Exception {try{bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.option(ChannelOption.SO_BACKLOG,1024).group(bossGroup,workGroup).channel(NioServerSocketChannel.class).localAddress(wsConfig.getPort()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new HttpServerCodec()).addLast(new ChunkedWriteHandler()).addLast(new HttpObjectAggregator(8192)).addLast(nioWebSocketHandler).addLast(new WebSocketServerProtocolHandler(wsConfig.getUrl(),null,true));}});channelFuture = serverBootstrap.bind().sync();}finally {if (channelFuture != null && channelFuture.isSuccess()){log.info("netty server startup on port:{} (websockt)with context path '{}'",wsConfig.getPort(),"/");}else{log.info("netty server startup failed");if (bossGroup != null){bossGroup.shutdownGracefully().sync();}if (workGroup != null){workGroup.shutdownGracefully().sync();}}}}@Overridepublic int getOrder() {return 0;}
}
以上代码serviceImpl应用:
/*** 新增聊天记录** @param chatRecord 聊天记录* @return 结果*/@Overridepublic int insertChatRecord(ChatRecord chatRecord) {MsgBody msgBody = new MsgBody();if (StrUtil.isNotBlank(chatRecord.getMsgType())){if (chatRecord.getMsgType().equals(MsgTypeEnum.TEXT.getMsgType())){msgBody.setMsgType(MsgBody.MsgType.text);}else if (chatRecord.getMsgType().equals(MsgTypeEnum.IMG.getMsgType())){msgBody.setMsgType(MsgBody.MsgType.img);}}msgBody.setMsgContent(chatRecord.getMsgContent());chatRecord.setMsgContent(JSON.toJSONString(msgBody));chatRecord.setId(IdUtils.getLongId());chatRecord.setCreateTime(DateUtils.getNowDate());int insert = chatRecordMapper.insertChatRecord(chatRecord);if(insert > 0){msgBody.setType(MsgBody.Type.self);//通知浏览器用户 webSocketChannelPool.sendToUser(chatRecord.getSendUserId().toString(),msgBody);msgBody.setType(MsgBody.Type.other);webSocketChannelPool.sendToUser(chatRecord.getReceiveUserId().toString(),msgBody);}return insert;}
相关文章:
netty与websockt实现聊天
配置websockt: import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;/*** websocket配置*/ Data Configuration ConfigurationProperties(prefix &qu…...
21.2 CSS 三大特性与页面布局
1. 开发者工具修改样式 使用开发者工具修改样式, 操作步骤如下: * 1. 打开开发者工具: 在浏览器中右键点击页面, 然后选择检查或者使用快捷键(一般是 F12 或者 CtrlShiftI)来打开开发者工具.* 2. 打开样式编辑器: 在开发者工具中, 找到选项卡或面板, 一般是Elements或者Elemen…...
MySQL 特殊语法时间格式以及Greadb连接
一、时间语法 DATE_FORMAT和to_char() select to_char(now(),%Y-%m-%d %H:%i:%s) from dual; select DATE_FORMAT(now(),%Y-%m-%d %H:%i:%s) from dual; 2.to_date() 和STR_TO_DATE(#{date},%Y-%m-%d ) select to_date(now(),yyyy-mm-dd hh24:mi:ss) from dual;...
Python(.pyc)反编译:pycdc工具安装与使用
本文将介绍如何将python的.pyc文件反编译成源码,以便我们对源码的学习与改进。pycdc工具安装 下载地址: 1、Github地址:https://github.com/zrax/pycdc ,下载后需要使用CMake进行编译。 2、已下载好及编译好的地址:ht…...
山西电力市场日前价格预测【2023-08-28】
日前价格预测 预测明日(2023-08-28)山西电力市场全天平均日前电价为319.70元/MWh。其中,最高日前电价为371.80元/MWh,预计出现在19: 15。最低日前电价为278.59元/MWh,预计出现在13: 00。 价差方向预测 1: …...
python3/pip3 SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed
环境: mac os 背景: 电脑之前安装的是python3.9 , 现在升级到python3.10。 从python官网下载macos版本的python3.10 pkg。 双击安装。 程序使用aiohttp访问ebay 。 出错: aiohttp.client_exceptions.ClientConnectorCertifi…...
Python中的迭代器与生成器
文章目录 1、迭代器2、生成器3、列表推导式和生成器表达式4、enumerate() 在Python中,迭代器(Iterator)和生成器(Generator)是两种用于处理可迭代对象的重要工具。而可迭代对象包括列表,元组,字…...
简单着色器编写(下)
函数部分介绍完了,最后来介绍一下main函数中的部分。 std::string vertexShader "#version 330 core\n" "\n" "layout(location0)in vec4 position;" "\n" "void main()\n" "{\n&…...
go并发编程基础
go并发编程 1waitgroup WaitGroup就是等待所有的goroutine全部执行完毕,add方式和Down方法要配套使用 package mainimport ("fmt""sync" )func main() {var wq sync.WaitGroupwq.Add(100) //监控多少个goroutine执行结束for i: 0;i<100;…...
PHP之 导入excel表格时,获取日期时间变成浮点数
读取到的时间 float(0.20833333333333) 原格式 15:00:00 代码 if (Request::isPost()) {$file_url input(upfile); // 本地上传文件地址// 读取文件内容$local_file_url __dir__./../../../public.$file_url;// $spreadsheet new Spreadsheet();// $sheet $spreadsheet-…...
学习 Java 报表技术导入 Maven 依赖出错:jacob 无法下载、jasperreports 依赖错误
发生缘由 最近在做一个可视化项目,用到了 Java 报表技术。在跟着「黑马」课程导入 pom.xml 文件的时候提示下载依赖错误。 com.jacob 包无法下载Failed to read artifact descriptor for com.lowagie:itext:jar:2.1.7.js6 运行环境 电脑系统版本:Win…...
力扣-哈希-最长连续序列
题目 给定一个未排序的整数数组 nums ,找出数字连续的最长序列(不要求序列元素在原数组中连续)的长度。 请你设计并实现时间复杂度为 O(n) 的算法解决此问题。 示例 1: **输入:**nums [100,4,200,1,3,2] **输出&a…...
Java线程 - 详解(1)
一,创建线程 方法一:继承Thread类 class MyThread extends Thread{Overridepublic void run() {System.out.println("线程1");} }public class Test {public static void main(String[] args) {MyThread myThread new MyThread();myThread.…...
结构体-C语言(初阶)
目录 一、结构体声明 1.1 结构概念 1.2 结构声明 1.3 结构成员的类型 1.4 结构体变量的定义和初始化 二、结构体成员的访问 2.1 结构体变量访问成员 2.2 结构体指针访问指向变量的成员 三、结构体传参 一、结构体声明 1.1 结构概念 结构是一些值的集合,这些值称为…...
【网络】HTTPS的加密
目录 第一组,非对称加密第二组,非对称加密第三组,对称加密证书签名 HTTPS使用的是非对称加密加对称加密的方案 (非对称加密:公钥加/解密,私钥解/加密) (对称加密:一组对称…...
Nacos安装指南
Nacos安装指南 1.Windows安装 开发阶段采用单机安装即可。 1.1.下载安装包 在Nacos的GitHub页面,提供有下载链接,可以下载编译好的Nacos服务端或者源代码: GitHub主页:https://github.com/alibaba/nacos GitHub的Release下载…...
java-Optional 类详解
目录 前言 Optional的构造方法 Optional的相关方法介绍 isPresent用法: get用法: filter用法: orElse用法: orElseGet用法 orElseThrow用法 map用法 flatMap用法: 前言 Optional 类是java8的新特性࿰…...
sql数据库怎么备份,sql 实时备份
在当今互联网时代,数据已经成为企业的核心资产。然而,数据的安全性和完整性面临硬件问题、软件故障、人工操作错误等各种威胁。为了保证数据的安全,实时备份已经成为公司必须采取的重要措施之一。下面我们就重点介绍SQL实时备份的重要实施方法…...
RK3399平台开发系列讲解(存储篇)Linux 存储系统的 I/O 栈
平台内核版本安卓版本RK3399Linux4.4Android7.1🚀返回专栏总目录 文章目录 一、Linux 存储系统全景二、Linux 存储系统的缓存沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将介绍 Linux 存储系统的 I/O 原理。 一、Linux 存储系统全景 我们可以把 Linux 存储系…...
Java“牵手”天猫淘口令转换API接口数据,天猫API接口申请指南
天猫平台商品淘口令接口是开放平台提供的一种API接口,通过调用API接口,开发者可以获取天猫商品的标题、价格、库存、商品快递费用,宝贝ID,发货地,区域ID,快递费用,月销量、总销量、库存、详情描…...
RWKV7-1.5B-G1A助力运维:利用Xshell脚本自动化模型部署与监控
RWKV7-1.5B-G1A助力运维:利用Xshell脚本自动化模型部署与监控 1. 引言 "又到周五下午4点,运维团队收到紧急需求——需要在10台服务器上部署最新的RWKV7-1.5B-G1A模型服务。"这样的场景对运维工程师来说再熟悉不过。传统的手动部署方式不仅耗…...
Z-Image-Turbo-辉夜巫女数据预处理实战:模拟VLOOKUP实现提示词与风格模板匹配
Z-Image-Turbo-辉夜巫女数据预处理实战:模拟VLOOKUP实现提示词与风格模板匹配 你有没有遇到过这样的烦恼?每次用AI画图,想生成一个“赛博朋克”风格的图片,都得重新回忆或者翻找之前写好的那一长串复杂的提示词。或者团队里每个人…...
GodotPckTool 终极指南:如何在命令行中高效管理Godot游戏资源包
GodotPckTool 终极指南:如何在命令行中高效管理Godot游戏资源包 【免费下载链接】GodotPckTool Standalone tool for extracting and creating Godot .pck files 项目地址: https://gitcode.com/gh_mirrors/go/GodotPckTool 你是否曾经需要在不启动Godot引擎…...
古基因组学:降解DNA的损伤模式、污染评估与群体历史推断
点击 “AladdinEdu,你的AI学习实践工作坊”,注册即送-H卡级别算力,沉浸式云原生集成开发环境,80G大显存多卡并行,按量弹性计费,教育用户更享超低价。 摘要:古基因组学通过对古代生物遗骸中高度降…...
Open UI5 源代码解析之736:CardBase.js
源代码仓库: https://github.com/SAP/openui5 源代码位置:src\sap.f\src\sap\f\CardBase.js CardBase.js 深度解析:在 OpenUI5 中承上启下的卡片基座 文件定位与整体判断 CardBase.js 位于 sap.f 库下,它不是面向业务开发者直接频繁实例化的组件,而是一个被多种卡片实…...
【Python并发革命】:GIL解除后首个生产级无锁插件生态正式开放下载(限时72小时)
第一章:Python并发革命的里程碑意义 Python 并发模型的演进并非渐进式改良,而是一场深刻重塑编程范式的革命。从早期依赖线程与锁的阻塞式模型,到 asyncio 的异步 I/O 抽象、async/await 语法糖的引入,再到结构化并发(…...
douyin-downloader:3大核心能力破解抖音内容高效下载难题
douyin-downloader:3大核心能力破解抖音内容高效下载难题 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fallback su…...
AD快捷键避坑指南:为什么你的自定义快捷键总是不生效?
AD快捷键避坑指南:为什么你的自定义快捷键总是不生效? 在AD(Altium Designer)这个功能强大的电子设计自动化软件中,快捷键是提升工作效率的利器。但很多用户都遇到过这样的困扰:明明按照教程设置了自定义快…...
卡证检测矫正模型中小企业降本:替代万元级专用证件扫描仪方案
卡证检测矫正模型:中小企业降本利器,替代万元级专用证件扫描仪方案 1. 引言:一个被忽视的降本痛点 如果你在中小企业负责行政、人事或财务,一定对下面这个场景不陌生:每天要处理一堆身份证、护照、驾照的复印件或扫描…...
SystemVerilog实战:在Vivado 2023.1中实现跨文件clog2计算的3种方法
SystemVerilog实战:在Vivado 2023.1中实现跨文件clog2计算的3种方法 当我们将传统Verilog项目迁移到SystemVerilog环境时,经常会遇到$clog2函数的兼容性问题。这个看似简单的对数计算函数,在不同工具链和文件类型中的表现可能大相径庭。特别是…...
