当前位置: 首页 > news >正文

相亲网站如何做自我介绍/百度经验手机版官网

相亲网站如何做自我介绍,百度经验手机版官网,做图片网站 解决版权,学网站开发在大学目录 前言阅读对象阅读导航前置知识课程内容一、使用Netty实现一个通信框架需要考虑什么问题二、通信框架功能设计2.1 功能描述2.2 通信模型2.3 消息体定义2.4 心跳机制2.5 重连机制*2.6 Handler的组织顺序2.7 交互式调试 三、代码实现:非必要。感兴趣的自行查看3.1…

目录

  • 前言
  • 阅读对象
  • 阅读导航
  • 前置知识
  • 课程内容
    • 一、使用Netty实现一个通信框架需要考虑什么问题
    • 二、通信框架功能设计
      • 2.1 功能描述
      • 2.2 通信模型
      • 2.3 消息体定义
      • 2.4 心跳机制
      • 2.5 重连机制
      • *2.6 Handler的组织顺序
      • 2.7 交互式调试
    • 三、代码实现:非必要。感兴趣的自行查看
      • 3.1 最外层的通信入口
        • 3.1.1 NettyRpcServer:服务端通信入口
        • 3.3 NettyRpcClient:客户端通信入口
        • 3.3 NettyRpcClient:交互式调试
      • 3.2 server包下
        • 3.2.1 ServerInitializer:服务端的Handler链化
        • 3.2.2 handler包下所有的handler
        • 3.2.3 helper包:工具包
        • 3.2.4 async包:异步处理类
      • 3.3 client包下
        • 3.3.1 ClientInitializer:客户端的Handler链化
        • 3.3.2 handler包下所有的handler
      • 3.4 common包下:一些公用的定义
        • 3.4.1 NettyConstant:一些公用常量
        • 3.4.2 helper包:工具包
        • 3.4.3 codec包:编解码(反)序列化工具
      • 3.4 biz包下:业务模拟
    • 四、业务流程图
  • 学习总结
  • 感谢

前言

阅读对象

  1. 有一定网络编程基础
  2. 了解Netty常用API

阅读导航

系列上一篇文章:《【Netty专题】Netty实战与核心组件详解》

前置知识

长连接:
长连接,也叫持久连接,是指在TCP层握手成功后,不立即断开连接,并在此连接的基础上进行多次消息(包括心跳)交互,直至连接的任意一方(客户端OR服务端)主动断开连接,此过程称为一次完整的长连接。
SOCKET 连接后不管是否使用都保持连接的一种连接

短连接:
短连接,顾名思义,与长连接的区别就是,客户端收到服务端的响应后,立刻发送FIN消息,主动释放连接。也有服务端主动断连的情况,凡是在一次消息交互(发请求-收响应)之后立刻断开连接的情况都称为短连接。短连接是建立在TCP协议上的,有完整的握手挥手流程,区别于UDP协议。(SOCKET 连接发送数据接收完数据后马上断开连接的一种连接

课程内容

一、使用Netty实现一个通信框架需要考虑什么问题

这里已经假设,我们确定使用Netty作为我们的网络编程框架了。但是我想,跟楼主一样的初学者,还是会有疑惑:那我想使用Netty开发一个简单的通信交互程序,该如何做到呢?
讲真的, 这个问题确实难倒了我。因为不在那个层次,我甚至连【需要关心什么问题】都搞不懂。思来想去,我只能以【没见过猪跑,但吃过猪肉】的视角出发, 大概说一下自己的想法了。
在这里插入图片描述

  1. 序列化和反序列化问题

Q1:什么是序列化、反序列化?
答:我们知道,数据在网络中传输不可能是原文传输的,人家机器设备只认得二进制01串。所以,把原文转换为字节流这个过程就是序列化;反之,则叫做反序列化。

Q2:序列化有什么作用?
答:主要目的有:【网络传输】及【对象持久化保存】。持久化保存知道啥意思吧,就是存到各种数据库中。

  1. 编解码问题

Q3:什么是编解码问题?
答:编解码就是将一个格式转变为另一个格式的过程。比如:MP3格式转为MP4,JSON转为XML。在我们这里,就是将字节流反序列化后的数据,如何转变为我们Java应用程序识别的数据结构

  1. 心跳保活机制

摘抄自【百度:文心一言】。
心跳保活机制是一种维持网络连接长连接的机制,它通过定时发送心跳包来检测双方是否存活。如果没有特意的设置某些选项或者实现应用层心跳包,TCP空闲的时候是不会发送任何数据包。也就是说,当一个TCP的socket,客户端与服务端谁也不发送数据,会一直保持着连接。这其中如果有一方异常掉线(例如死机、路由被破坏、防火墙切断连接等),另一端如果没有发送数据,永远也不可能知道。这对于一些服务型的程序来说,是灾难性的后果,将会导致服务端socket资源耗尽。

因此,需要心跳保活机制来维持连接的有效性,及时有效地检测到一方的非正常断开,保证连接的资源被有效的利用。心跳保活机制可以应用在TCP协议层实现(例如使用TCP Keepalive),也可以在应用层实现(例如使用心跳包)。在应用层实现心跳保活机制时,通常由客户端向服务端发送自定义消息命令,服务端收到消息后回复自定义的消息给客户端。如果服务端未收到消息,则表示连接失败,如果失败的次数达到指定上限后,则重新发起连接。

  1. 公共消息体定义

这个无需多说,哪怕是我们在做web开发期间也会定义一个统一口径

特别提醒
个人在学习的时候发现网上对于【序列化】和【编解码】这两个概念多少有点混淆,或者说边界模糊。确实,他们是有些关联的,但区别也有。以下是摘抄自【百度:文心一言】

编码器和解码器常用于处理数据在不同格式之间的转换;
而序列化是将数据结构或对象状态转换为可以存储或传输的格式的过程。
因此,编码器和解码器主要关注的是数据的表示和转换,而序列化和反序列化主要关注的是对象状态的转换。

二、通信框架功能设计

2.1 功能描述

我在后面的编码中,将围绕以下功能来实现一个简单的长连接通信框架。功能如下:

  1. 基于 Netty 的 NIO 通信框架
  2. 提供消息的编解码框架,可以实现 POJO 的序列化和反序列化(【编解码】与【序列化】一块)
  3. 消息内容防篡改机制(就跟我们web开发的鉴权一样,在处理之前先校验一下内容合法性)
  4. 提供基于 IP 地址的白名单接入认证机制
  5. 断线重连机制

2.2 通信模型

在这里插入图片描述
模型解读如下:
1)客户端发送应用握手请求消息,携带节点 ID 等有效身份认证信息
2)服务端对应用握手请求消息进行合法性校验,包括节点 ID 有效性校验、节点重复登录校验和 IP 地址合法性校验,校验通过后,返回登录成功的应用握手应答消息
3)链路建立成功之后,客户端发送业务消息
4)链路成功之后,服务端发送心跳消息
5)链路建立成功之后,客户端发送心跳消息
6)链路建立成功之后,服务端发送业务消息
7)服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接

PS:协议通信双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息给对方,所以通信方式有如下两种:

  • TWO-WAY:即需要响应的请求。如请求登录
  • ONE-WAY:即无需响应的请求。如日志记录

双方之间的心跳采用 Ping-Pong 机制,当链路处于空闲状态时,客户端主动发送Ping 消息给服务端,服务端接收到 Ping 消息后发送应答消息 Pong 给客户端,如果客户端连续发送 N 条 Ping 消息都没有接收到服务端返回的 Pong 消息,说明链路已经挂死或者对方处于异常状态,客户端主动关闭连接,间隔周期 T 后发起重连操作,直到重连成功。

2.3 消息体定义

在我的设计中,把消息定义分为了两个部分:消息头、消息体。代码如下:

@Getter
@Setter
@ToString
public class CommonMessage {/*** 消息头*/private CommonMessageHeader header;/*** 0-失败;1-成功*/private Byte result;/*** 消息体*/private Object body;
}@Getter
@Setter
@ToString
public final class CommonMessageHeader {/*** 消息体的MD5摘要,用来做简单校验*/private String md5;/*** 服务标识*/private int severId;/*** 消息id*/private long msgID;/*** 消息类型,枚举值。见MessageType*/private byte type;
}

然后是消息类型:

public enum MessageType {/*** 业务请求消息*/SERVICE_REQ((byte) 0),/*** 业务应答消息*/SERVICE_RESP((byte) 1),/*** 无需应答的业务请求消息*/SERVICE_REQ_ONE_WAY((byte) 2),/*** 心跳请求消息*/HEARTBEAT_REQ((byte) 99),/*** 心跳应答消息*/HEARTBEAT_RESP((byte) 100),;private byte value;MessageType(byte value) {this.value = value;}public byte value() {return this.value;}
}

2.4 心跳机制

心跳机制我估计大家多少能理解,这个名字就起的很形象。当读或者写心跳消息发生 I/O 异常的时候,说明已经中断,此时需要立即关闭连接,如果是客户端,需要重新发起连接;如果是服务端,需要清空缓存的半包信息,等到客户端重连。

是的,两边都需要心跳检测,毕竟是【全双工】

但是心跳机制的设计,也是有点说法的。比如,什么时候需要传心跳包过去;发什么包过去。

先说发什么包过去。这个就比较简单了,正常来说发一个空包就行了,除非你有什么特别的要求。比如我就在消息定义中新增了一个类型简单标记一下而已。
在这里插入图片描述
再说,什么时候发。

方案一:最粗暴
最粗暴的当然是,TCP握手完成之后开始启动一个心跳任务,然后以固定的频率发送,不管三次二十一,我就要在存续期间一直发。这当然可以实现目标,但是,这【合李】吗?

方案二:小改进
很简单的道理啊,如果我们互相之间本身就正在进行业务上的通信,咱俩都正在【说话】呢,你还发个心跳过来问我【你死没死】啊,你礼貌吗这?所以,我们可以使用Netty提供的一个【写空闲检测】机制来完成。直接上源码给你们看:

// IdleStateHandler。一个实现了InBound和OutBound的Handler
public IdleStateHandler(int readerIdleTimeSeconds,int writerIdleTimeSeconds,int allIdleTimeSeconds) {this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,TimeUnit.SECONDS);}

参数解读:

  • readerIdleTimeSeconds:当在指定的时间段内没有执行读操作时,将触发 IdleState.READER_IDLE。0表示禁用
  • writerIdleTimeSeconds:当在指定的时间段内没有执行写操作时,将触发IdleState.WRITER_IDLE。0表示禁用
  • allIdleTimeSeconds:当在指定的时间段内没有进行读写操作时,将触发IdleState.ALL_IDLE。0表示禁用

PS:检测空闲连接以及超时对于及时释放资源来说是至关重要的。这就是心跳机制做的事情。因为很重要,所以Netty也给我们预提供了这些Handler,就是上面说的IdleStateHandler

2.5 重连机制

如果链路中断,等到INTERVAL时间后,由客户端发起重连操作,如果重连失败,间隔周期INTERVAL后再次发起重连,直到重连成功。
为了保持服务端能够有充足的时间释放句柄资源,在首次断连时客户端需要等待INTERVAL时间之后再发起重连,而不是失败后立即重连。
为了保证句柄资源能够及时释放,无论什么场景下重连失败,客户端必须保证自身的资源被及时释放,包括但不限于SocketChannel、Socket 等。
重连失败后,可以打印异常堆栈信息,方便后续的问题定位。

*2.6 Handler的组织顺序

大家还记得吗?Handler出入境可以是无序的,但是,同是入境、出境的Handler之间是局部有序的。这不难理解,就跟JDK8的Stream一样,前面对流的操作会影响后面的结果。所以,顺序很重要。这边大概的模型如下:
在这里插入图片描述
我想,大家应该能理解为什么我的顺序是这样组织的吧…

  1. 写空闲监控或者性能监控放前面没毛病,正常来说这个业务不会去操作原始报文
  2. 粘包半包处理。开始对包数据做拆分了,这一步肯定要在所有需要操作【业务报文】的前面做。为啥?我都没拆包给你呢,你咋知道这个就是你要的
  3. 序列化反序列化。正常【2】之后拿到的就是【字节流业务报文】,这个时候需要先【序列化/反序列化】再【编解码】(这两步我合在一起做了)
  4. 【读空闲】即:心跳机制。 【心跳】跟【认证申请/检查】需要分客户端跟服务端。客户端都没有登录呢,没必要开启【心跳】对吧,所以客户端的【心跳】在【认证申请】之后;服务端我就不多解释了吧
  5. 【心跳请求/应答】也是心跳机制里面的东西
  6. 业务处理

大家好好思考下,理论上5/6是可以随意调换顺序的,毕竟【心跳包】是一种特殊的业务

我估计有很多人不理解【读空闲】跟【心跳请求/应答】的关系,大家可以再看看【2.4 心跳机制】的【方案二】。我说他们都是心跳机制里面的,怎么理解?

  • 读空闲:其实就是一种事件监听机制。监听Channel上的【读事件】。当事件发生的时候触发对应事件,并且往管道中传输(说到这里了,写空闲也知道了吧)
  • 心跳请求/应答:对上面说的事件的响应

2.7 交互式调试

写了通信的客户端、服务端怎么调试呢?咱也没有可视化界面,所以我就搞了一个最原始的,通过Scanner输入命令的交互式调试方案。像这样:
在这里插入图片描述
你懂我意思吧在这里插入图片描述

三、代码实现:非必要。感兴趣的自行查看

头疼。代码貌似也挺多的,本来想打包压缩包上来,然后只贴核心代码。但是我的电脑有加密软件,打包上来的代码可能会有问题。代码包结构如下:
在这里插入图片描述
红框内的pojo已经给过了,咱就不重复上代码了。如果大家要本地运行的话,可以跟我一样先创建好对应的包,然后一个一个复制上去吧。真不是我不想给源码压缩包,而是加密问题。给了你们打开也是乱码

3.1 最外层的通信入口

在这里插入图片描述
这三个类。

3.1.1 NettyRpcServer:服务端通信入口
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.server.ServerInitializer;/*** RPC Server服务端** @author zhangshen* @date 2023/10/25 9:16* @slogan 编码即学习,注释断语义**/
@Slf4j
public class NettyRpcServer {public static void main(String[] args) {try {start();} catch (InterruptedException e) {e.printStackTrace();log.error("【Netty服务器】启动失败");}}private static void start() throws InterruptedException {EventLoopGroup acceptLoopGroup = new NioEventLoopGroup();EventLoopGroup reactorLoopGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(acceptLoopGroup, reactorLoopGroup).channel(NioServerSocketChannel.class).localAddress(NettyConstant.PORT).childHandler(new ServerInitializer());serverBootstrap.bind().sync();log.info("【Netty服务器】启动成功");}
}

上面这个很简单啦,跟上一篇文章的使用示例如出一辙。不同的是,在Pipeline中链化Handler的逻辑我单独抽出来写成一个ServerInitializer。后面会讲

3.3 NettyRpcClient:客户端通信入口
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.client.ClientInitializer;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.MessageType;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** RPC Client客户端** @author zhangshen* @date 2023/10/25 9:16* @slogan 编码即学习,注释断语义**/
@Slf4j
public class NettyRpcClient {private Channel channel;private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();private volatile boolean userClose = false;/*** 定时线程池,用于断线重连*/private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);public void connect() throws InterruptedException {try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ClientInitializer());ChannelFuture sync = bootstrap.connect(new InetSocketAddress(NettyConstant.SERVER_IP, NettyConstant.PORT)).sync();log.info("【Netty客户端】启动成功");channel = sync.channel();channel.closeFuture().sync();} finally {if (userClose) {channel = null;eventLoopGroup.shutdownGracefully().sync();} else {// 断线重连reconect();}}}/*** 断线重连*/private void reconect() {log.info("【Netty客户端】开始断线重连");executor.execute(() -> {try {// 给操作系统足够的时间,去释放相关的资源TimeUnit.SECONDS.sleep(1);connect();} catch (InterruptedException e) {e.printStackTrace();}});}public void sendMessage(Object msg) {if (channel == null || !channel.isActive()) {throw new IllegalStateException("和服务器还未未建立起有效连接,请稍后再试!!");}CommonMessage message = MessageGenerateHelper.requestWithMsgId(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_REQ.value(), msg);log.info("【Netty客户端】发送消息。CommonMessage={}", message);channel.writeAndFlush(message);}public void sendOneWay(Object msg) {if (channel == null || !channel.isActive()) {throw new IllegalStateException("和服务器还未未建立起有效连接,请稍后再试!!");}CommonMessage message = MessageGenerateHelper.requestWithMsgId(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_REQ_ONE_WAY.value(), msg);log.info("【Netty客户端】发送消息。CommonMessage={}", message);channel.writeAndFlush(message);}public void close() {userClose = true;channel.close();}
}

在这里插入图片描述
这个相对于服务端,以及前面的使用示例的客户端确实稍微复杂一点。主要的变化是【断线重连】机制引起的:

  1. 引入了【定时线程池】,定时调用reconnect方法
  2. 新增了reconnect()方法,处理断线重连
  3. 【断线重连】还得看是不是自己主动发起的【关闭】,如果是自己主动发起的关闭肯定不能重连啊
3.3 NettyRpcClient:交互式调试
import java.util.Scanner;/*** @author zhangshen* @date 2023/10/25 12:31* @slogan 编码即学习,注释断语义**/
public class ScannerCmdClient {public static void main(String[] args) throws InterruptedException {// 新建客户端NettyRpcClient client = new NettyRpcClient();// 显示菜单栏showMenu();Scanner scanner = new Scanner(System.in);while (true) {int cmd = scanner.nextInt();switch (cmd) {case 1:client.connect();Thread.sleep(3000);break;case 2:client.sendMessage("客户端发送双端信息");break;case 3:client.sendOneWay("客户端发送ONE-WAY信息");break;case 4:client.close();case 5:showMenu();default:client.close();}}}/*** 展示菜单*/private static void showMenu() {System.out.println("请选择以下功能:");System.out.println("【1】与服务端建立连接");System.out.println("【2】发送一个有响应的消息");System.out.println("【3】发送一个无响应的消息");System.out.println("【4】关闭连接");System.out.println("【5】显示菜单栏");}
}

前面介绍过作用了,不多说了

3.2 server包下

3.2.1 ServerInitializer:服务端的Handler链化
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.codec.KryoDecodeHandler;
import org.tuling.io.rpc.common.codec.KryoEncodeHandler;
import org.tuling.io.rpc.server.handler.ServerHeartBeatHandler;
import org.tuling.io.rpc.server.handler.ServerLoginHandler;
import org.tuling.io.rpc.server.handler.ServerOrderHandler;/*** 服务端,通道初始化器** @author zhangshen* @date 2023/10/25 9:44* @slogan 编码即学习,注释断语义**/
public class ServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 添加【粘包/分包】处理器。 由Netty预备提供的pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0,2));pipeline.addLast(new LengthFieldPrepender(2));// 添加【序列化/反序列化】处理器,开源序列化工具pipeline.addLast(new KryoDecodeHandler());pipeline.addLast(new KryoEncodeHandler());// 添加【心跳】处理器,Netty预备提供的pipeline.addLast(new ReadTimeoutHandler(NettyConstant.HEARBEAT_FREQUENCY));// 添加业务处理器pipeline.addLast(new ServerLoginHandler());pipeline.addLast(new ServerHeartBeatHandler());pipeline.addLast(new ServerOrderHandler());}
}
3.2.2 handler包下所有的handler

在这里插入图片描述
我就不一一说代码了,只贴。没啥难度的,最重要的还是顺序,在【2.6 Handler的顺序组织】已经解释了一波了。

ServerHeartBeatHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;
import org.tuling.io.rpc.server.helper.SecurityCenterHelper;/*** 服务器心跳处理** @author zhangshen* @date 2023/10/25 10:33* @slogan 编码即学习,注释断语义**/
@Slf4j
public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CommonMessage message = (CommonMessage) msg;CommonMessageHeader header = message.getHeader();if (header == null) {log.error("【Netty服务器】非法消息");ctx.writeAndFlush("非法消息");ctx.close();ReferenceCountUtil.release(msg);return;}if (header.getType() != MessageType.HEARTBEAT_REQ.value()) {ctx.fireChannelRead(msg);return;}// 处理心跳业务log.info("【Netty服务器】心跳应答");CommonMessage heartBeatResponse = MessageGenerateHelper.success(-1, MessageType.HEARTBEAT_RESP.value(), null);ctx.writeAndFlush(heartBeatResponse);ReferenceCountUtil.release(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if(cause instanceof ReadTimeoutException){log.debug("【Netty服务器】客户端长时间未通信,可能已经宕机,关闭链路");SecurityCenterHelper.removeLoginUser(ctx.channel().remoteAddress().toString());ctx.close();}super.exceptionCaught(ctx, cause);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("【Netty服务器】客户端已关闭连接");super.channelInactive(ctx);}
}

ServerLoginHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;
import org.tuling.io.rpc.server.helper.SecurityCenterHelper;import java.net.InetSocketAddress;/*** 登录服务器处理器** @author zhangshen* @date 2023/10/25 10:05* @slogan 编码即学习,注释断语义**/
@Slf4j
public class ServerLoginHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CommonMessage message = (CommonMessage) msg;CommonMessageHeader header = message.getHeader();if (header == null) {log.error("【Netty服务器】非法消息");ctx.writeAndFlush("非法消息");ctx.close();ReferenceCountUtil.release(msg);return;}if (header.getSeverId() != NettyConstant.LOGIN_SERVER_ID) {ctx.fireChannelRead(msg);return;}// 处理登录业务this.checkLogin(ctx, msg);ReferenceCountUtil.release(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 删除缓存SecurityCenterHelper.removeLoginUser(ctx.channel().remoteAddress().toString());ctx.close();}private void checkLogin(ChannelHandlerContext ctx, Object msg) {log.info("【Netty服务器】登录消息CommonMessage={}", msg);InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();String userLoginIP = socketAddress.getAddress().getHostAddress();// 白名单校验boolean whiteIP = SecurityCenterHelper.isWhiteIP(userLoginIP);if (!whiteIP) {String errorMessage = "不在白名单内";log.error("【Netty服务器】{}", errorMessage);CommonMessage message = MessageGenerateHelper.fail(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), errorMessage);ctx.writeAndFlush(message);ctx.close();ReferenceCountUtil.release(msg);return;}// 重复登录校验String userInfo = ctx.channel().remoteAddress().toString();boolean repeatLogin = SecurityCenterHelper.isRepeatLogin(userInfo);if (repeatLogin) {String errorMessage = "重复登录";log.error("【Netty服务器】{}", errorMessage);CommonMessage message = MessageGenerateHelper.fail(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), errorMessage);ctx.writeAndFlush(message);ctx.close();ReferenceCountUtil.release(msg);return;}// 通过校验,记录SecurityCenterHelper.addLoginUser(userInfo);String successMessage = "登录成功";log.info("【Netty服务器】{}", successMessage);CommonMessage message = MessageGenerateHelper.success(NettyConstant.LOGIN_SERVER_ID, MessageType.SERVICE_RESP.value(), successMessage);ctx.writeAndFlush(message);}
}

ServerOrderHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.biz.OrderInfo;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.EncryptHelper;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;
import org.tuling.io.rpc.server.async.AsyncBusiProcessor;import java.math.BigDecimal;/*** 订单业务处理类** @author zhangshen* @date 2023/10/25 10:39* @slogan 编码即学习,注释断语义**/
@Slf4j
public class ServerOrderHandler extends SimpleChannelInboundHandler<CommonMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, CommonMessage msg) throws Exception {// 检查MD5final CommonMessageHeader header = msg.getHeader();if (header == null) {log.error("【Netty服务器】非法消息");ctx.writeAndFlush("非法消息");ctx.close();ReferenceCountUtil.release(msg);return;}if (header.getSeverId() != NettyConstant.ORDER_SERVER_ID) {ctx.fireChannelRead(msg);return;}log.info("【Netty服务器】CommonMessage={}", msg);String headMd5 = header.getMd5();String calcMd5 = EncryptHelper.encryptObj(msg.getBody());if (!headMd5.equals(calcMd5)) {log.error("【Netty服务器】报文md5检查不通过:" + headMd5 + " vs " + calcMd5 + ",关闭连接");CommonMessage message = MessageGenerateHelper.fail(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(),"报文md5检查不通过,关闭连接");ctx.writeAndFlush(message);ctx.close();}log.info(msg.toString());if (header.getType() == MessageType.SERVICE_REQ_ONE_WAY.value()) {log.debug("【Netty服务器】ONE_WAY类型消息,异步处理");AsyncBusiProcessor.submitTask(() -> {log.info("【Netty服务器】模仿异步,ONE_WEY业务处理");});} else {log.debug("【Netty服务器】TWO_WAY类型消息,应答");OrderInfo orderInfo = new OrderInfo();orderInfo.setOrderId("123456");orderInfo.setProductCount(2);orderInfo.setAmount(BigDecimal.valueOf(1499.99));CommonMessage message = MessageGenerateHelper.success(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), orderInfo);ctx.writeAndFlush(message);}}
}
3.2.3 helper包:工具包

helper包就是utils工具包。我喜欢叫做helper而已。里面只有一个类SecurityCenterHelper。用来实现【白名单】,还有【重复登录校验】机制。

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;/*** 登录安全校验助手** @author zhangshen* @date 2023/10/25 10:14* @slogan 编码即学习,注释断语义**/
public class SecurityCenterHelper {/*** 用以检查用户是否重复登录的缓存*/private static Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>();/*** 用户登录的白名单*/private static Set<String> whiteList = new CopyOnWriteArraySet<>();static {whiteList.add("127.0.0.1");}/*** 是否白名单内*/public static boolean isWhiteIP(String ip) {return whiteList.contains(ip);}/*** 给定用户信息是否重复登录*/public static boolean isRepeatLogin(String usrInfo) {return nodeCheck.containsKey(usrInfo);}/*** 添加登录用户信息*/public static void addLoginUser(String usrInfo) {nodeCheck.put(usrInfo, true);}/*** 移除登录用户信息*/public static void removeLoginUser(String usrInfo) {nodeCheck.remove(usrInfo, true);}}
3.2.4 async包:异步处理类

里面只有一个类AsyncBusiProcessor,用来处理需要【异步】的任务。

import io.netty.util.NettyRuntime;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;/*** 异步业务处理器。某些消息可以异步处理,比如ONE_WAY类型消息** @author zhangshen* @date 2023/10/25 10:56* @slogan 编码即学习,注释断语义**/
@Slf4j
public class AsyncBusiProcessor {private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(3000);private static final ExecutorService executorService;static {int cores = NettyRuntime.availableProcessors();executorService = new ThreadPoolExecutor(1,cores,60,TimeUnit.SECONDS,taskQueue);}/*** 提交异步执行的任务** @param task 任务*/public static void submitTask(Runnable task) {executorService.execute(task);}
}

3.3 client包下

Client包下跟Server包下的东西其实差不多,大家自行理解

3.3.1 ClientInitializer:客户端的Handler链化
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.tuling.io.rpc.client.handler.CheckWriteIdleHandler;
import org.tuling.io.rpc.client.handler.ClientHeartBeatHandler;
import org.tuling.io.rpc.client.handler.ClientLoginHandler;
import org.tuling.io.rpc.client.handler.ClientOrderHandler;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.codec.KryoDecodeHandler;
import org.tuling.io.rpc.common.codec.KryoEncodeHandler;/*** 客户端,通道初始化器** @author zhangshen* @date 2023/10/25 9:44* @slogan 编码即学习,注释断语义**/
public class ClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 写空闲自己检测pipeline.addLast(new CheckWriteIdleHandler());// 添加【粘包/分包】处理器。 由Netty预备提供的pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0,2));pipeline.addLast(new LengthFieldPrepender(2));// 添加【序列化/反序列化】处理器,开源序列化工具pipeline.addLast(new KryoDecodeHandler());pipeline.addLast(new KryoEncodeHandler());// 添加登录处理器// 登录处理器需放在心跳前面pipeline.addLast(new ClientLoginHandler());// 添加【心跳】处理器,Netty预备提供的pipeline.addLast(new ReadTimeoutHandler(NettyConstant.HEARBEAT_FREQUENCY));pipeline.addLast(new ClientHeartBeatHandler());pipeline.addLast(new ClientOrderHandler());}
}
3.3.2 handler包下所有的handler

CheckWriteIdleHandler:客户端写空闲检测

import io.netty.handler.timeout.IdleStateHandler;/*** 客户端检测自己的写空闲** @author zhangshen* @date 2023/10/25 11:08* @slogan 编码即学习,注释断语义**/
public class CheckWriteIdleHandler extends IdleStateHandler {public CheckWriteIdleHandler() {super(0, 8, 0);}
}

ClientHeartBeatHandler:客户端心跳处理

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;/*** 客户端在长久未向服务器业务请求时,发出心跳请求报文** @author zhangshen* @date 2023/10/25 11:33* @slogan 编码即学习,注释断语义**/
@Slf4j
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {CommonMessage request = MessageGenerateHelper.request(-1, MessageType.HEARTBEAT_REQ.value(), null);log.debug("【Netty客户端】写空闲,发出心跳报文维持连接: " + request);ctx.writeAndFlush(request);}super.userEventTriggered(ctx, evt);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CommonMessage message = (CommonMessage) msg;CommonMessageHeader header = message.getHeader();if (header != null&& header.getType() == MessageType.HEARTBEAT_RESP.value()) {log.debug("【Netty客户端】收到服务器心跳应答,服务器正常");ReferenceCountUtil.release(msg);}  else {ctx.fireChannelRead(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof ReadTimeoutException) {log.debug("【Netty客户端】服务器长时间未应答,关闭链路");}super.exceptionCaught(ctx, cause);}
}

ClientLoginHandler:客户端登录请求。TCP三次握手成功之后就请求

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;/*** 客户端,发起登录请求** @author zhangshen* @date 2023/10/25 11:11* @slogan 编码即学习,注释断语义**/
@Slf4j
public class ClientLoginHandler extends ChannelInboundHandlerAdapter {public void channelActive(ChannelHandlerContext ctx) throws Exception {// TCP三次握手完成,发出认证请求CommonMessage message = MessageGenerateHelper.request(NettyConstant.LOGIN_SERVER_ID, MessageType.SERVICE_REQ.value(), null);log.info("【Netty客户端】请求服务器认证 : " + message);ctx.writeAndFlush(message);}public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CommonMessage message = (CommonMessage) msg;CommonMessageHeader header = message.getHeader();if (header != null&& header.getType() == MessageType.SERVICE_RESP.value()&& header.getSeverId() == NettyConstant.LOGIN_SERVER_ID) {log.info("【Netty客户端】收到认证应答报文,服务器是否验证通过?");byte loginResult = message.getResult();if (loginResult != 1) {// 握手失败,关闭连接log.debug("【Netty客户端】未通过认证,关闭连接: " + message);ctx.close();} else {log.info("【Netty客户端】通过认证,移除本处理器,进入业务通信 : " + message);ctx.pipeline().remove(this);ReferenceCountUtil.release(msg);}} else {ctx.fireChannelRead(msg);}}
}

ClientOrderHandler:瞎写的一个业务拓展类,目前只是打印。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.pojo.CommonMessage;/*** @author Mark老师* 类说明:接收业务应答消息并处理*/
@Slf4j
public class ClientOrderHandler extends SimpleChannelInboundHandler<CommonMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, CommonMessage msg) throws Exception {log.info("【Netty客户端】业务应答消息:" + msg.toString());ReferenceCountUtil.release(msg);}
}

3.4 common包下:一些公用的定义

pojo的我就不贴了,看【2.3 消息体定义】

3.4.1 NettyConstant:一些公用常量
/*** 常量定义** @author zhangshen* @date 2023/10/25 9:41* @slogan 编码即学习,注释断语义**/
public interface NettyConstant {/*** 程序绑定端口*/int PORT = 8585;/*** 程序ip地址*/String SERVER_IP = "127.0.0.1";/*** 成功*/byte SUCCESS = 1;/*** 失败*/byte FAIL = 0;/*** 心跳检测频率* 单位:秒*/int HEARBEAT_FREQUENCY = 15;/*** 登录服务器标识*/int LOGIN_SERVER_ID = 1;/*** 订单服务器标识*/int ORDER_SERVER_ID = 2;
}
3.4.2 helper包:工具包

EncryptHelper:防篡改的加密摘要

import org.tuling.io.rpc.common.codec.KryoSerializer;import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;/*** 摘要的工具类** @author zhangshen* @date 2023/10/25 10:47* @slogan 编码即学习,注释断语义**/
public class EncryptHelper {/*** 加密信息** @param strSrc  需要被摘要的字符串* @param encName 摘要方式,有 MD5、SHA-1和SHA-256 这三种,缺省为MD5* @return 返回摘要字符串*/private static String EncryptStr(String strSrc, String encName) {MessageDigest md = null;String strDes = null;byte[] bt = strSrc.getBytes();try {if (encName == null || encName.equals("")) {encName = "MD5";}md = MessageDigest.getInstance(encName);md.update(bt);strDes = bytes2Hex(md.digest()); // to HexString} catch (NoSuchAlgorithmException e) {System.out.println("Invalid algorithm.");return null;}return strDes;}/*** MD5摘要** @param str 需要被摘要的字符串* @return 对字符串str进行MD5摘要后,将摘要字符串返回*/public static String EncryptByMD5(String str) {return EncryptStr(str, "MD5");}/*** SHA1摘要** @param str 需要被摘要的字符串* @return 对字符串str进行SHA-1摘要后,将摘要字符串返回*/public static String EncryptBySHA1(String str) {return EncryptStr(str, "SHA-1");}/*** SHA256摘要** @param str 需要被摘要的字符串* @return 对字符串str进行SHA-256摘要后,将摘要字符串返回*/public static String EncryptBySHA256(String str) {return EncryptStr(str, "SHA-256");}/*** 字节转十六进制,结果以字符串形式呈现*/private static String bytes2Hex(byte[] bts) {String des = "";String tmp = null;for (int i = 0; i < bts.length; i++) {tmp = (Integer.toHexString(bts[i] & 0xFF));if (tmp.length() == 1) {des += "0";}des += tmp;}return des;}/*** 对字符串进行MD5加盐摘要* 先将str进行一次MD5摘要,摘要后再取摘要后的字符串的第1、3、5个字符追加到摘要串,* 再拿这个摘要串再次进行摘要*/public static String encrypt(String str) {String encryptStr = EncryptByMD5(str);if (encryptStr != null) {encryptStr = encryptStr + encryptStr.charAt(0) + encryptStr.charAt(2) + encryptStr.charAt(4);encryptStr = EncryptByMD5(encryptStr);}return encryptStr;}/*** 对对象进行MD5摘要,先对对象进行序列化,转为byte数组,* 再将byte数组转为字符串,然后进行MD5加盐摘要*/public static String encryptObj(Object o) {return encrypt(bytes2Hex(KryoSerializer.obj2Bytes(o)));}
}

MessageGenerateHelper:消息生成工具,消除代码重复用的

import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;import java.util.concurrent.atomic.AtomicLong;/*** 消息生成助手** @author zhangshen* @date 2023/10/25 11:21* @slogan 编码即学习,注释断语义**/
public class MessageGenerateHelper {private static AtomicLong msgId = new AtomicLong(1);public static long getID() {return msgId.getAndIncrement();}/*** 构建成功的业务消息*/public static CommonMessage success(int serverId, byte type, Object msg) {CommonMessage message = new CommonMessage();CommonMessageHeader header = getHeader(serverId, type);message.setHeader(header);message.setResult(NettyConstant.SUCCESS);message.setBody(msg);return message;}/*** 构建失败的业务消息*/public static CommonMessage fail(int serverId, byte type, Object msg) {CommonMessage message = new CommonMessage();CommonMessageHeader header = getHeader(serverId, type);message.setHeader(header);message.setResult(NettyConstant.FAIL);message.setBody(msg);return message;}/*** 构建请求业务消息*/public static CommonMessage request(int serverId, byte type, Object msg) {CommonMessage message = new CommonMessage();CommonMessageHeader header = getHeader(serverId, type);message.setHeader(header);message.setBody(msg);return message;}/*** 构建请求业务消息*/public static CommonMessage requestWithMsgId(int serverId, byte type, Object msg) {CommonMessage message = new CommonMessage();CommonMessageHeader header = getHeader(serverId, type);header.setMsgID(getID());header.setMd5(EncryptHelper.encryptObj(msg));message.setHeader(header);message.setBody(msg);return message;}private static CommonMessageHeader getHeader(int serverId, byte type) {CommonMessageHeader header = new CommonMessageHeader();header.setSeverId(serverId);header.setType(type);return header;}
}
3.4.3 codec包:编解码(反)序列化工具

这里面的是一个基于Kryo编解码序列化API实现的。pom.xml如下:

        <dependency><groupId>de.javakaffee</groupId><artifactId>kryo-serializers</artifactId><version>0.42</version></dependency>

当然大家可以使用其他的API,我这里是抄的。

KryoFactory:Kryo实例,API要求的

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import de.javakaffee.kryoserializers.*;import java.lang.reflect.InvocationHandler;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;/*** Kryo的工厂,拿到Kryo的实例** @author zhanghuitong* @date 2023/10/25 20:12* @slogan 编码即学习,注释断语义**/
public class KryoFactory {public static Kryo createKryo() {Kryo kryo = new Kryo();kryo.setRegistrationRequired(false);kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());kryo.register(InvocationHandler.class, new JdkProxySerializer());kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());kryo.register(Pattern.class, new RegexSerializer());kryo.register(BitSet.class, new BitSetSerializer());kryo.register(URI.class, new URISerializer());kryo.register(UUID.class, new UUIDSerializer());UnmodifiableCollectionsSerializer.registerSerializers(kryo);SynchronizedCollectionsSerializer.registerSerializers(kryo);kryo.register(HashMap.class);kryo.register(ArrayList.class);kryo.register(LinkedList.class);kryo.register(HashSet.class);kryo.register(TreeSet.class);kryo.register(Hashtable.class);kryo.register(Date.class);kryo.register(Calendar.class);kryo.register(ConcurrentHashMap.class);kryo.register(SimpleDateFormat.class);kryo.register(GregorianCalendar.class);kryo.register(Vector.class);kryo.register(BitSet.class);kryo.register(StringBuffer.class);kryo.register(StringBuilder.class);kryo.register(Object.class);kryo.register(Object[].class);kryo.register(String[].class);kryo.register(byte[].class);kryo.register(char[].class);kryo.register(int[].class);kryo.register(float[].class);kryo.register(double[].class);return kryo;}
}

KryoSerializer序列化工具:

/*** Kryo的序列化器,负责序列化和反序列化** @author zhanghuitong* @date 2023/10/25 20:12* @slogan 编码即学习,注释断语义**/
public class KryoSerializer {private static Kryo kryo = KryoFactory.createKryo();/*序列化*/public static void serialize(Object object, ByteBuf out) {ByteArrayOutputStream baos = new ByteArrayOutputStream();Output output = new Output(baos);kryo.writeClassAndObject(output, object);output.flush();output.close();byte[] b = baos.toByteArray();try {baos.flush();baos.close();} catch (IOException e) {e.printStackTrace();}out.writeBytes(b);}/*序列化为一个字节数组,主要用在消息摘要上*/public static byte[] obj2Bytes(Object object) {ByteArrayOutputStream baos = new ByteArrayOutputStream();Output output = new Output(baos);kryo.writeClassAndObject(output, object);output.flush();output.close();byte[] b = baos.toByteArray();try {baos.flush();baos.close();} catch (IOException e) {e.printStackTrace();}return b;}/*反序列化*/public static Object deserialize(ByteBuf out) {if (out == null) {return null;}Input input = new Input(new ByteBufInputStream(out));return kryo.readClassAndObject(input);}
}

KryoEncodeHandler:编码处理器,实现了NettyMessageToByteEncoder接口的Handler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.tuling.io.rpc.common.pojo.CommonMessage;/*** 序列化的Handler** @author zhangshen* @date 2023/10/25 9:54* @slogan 编码即学习,注释断语义**/
public class KryoEncodeHandler extends MessageToByteEncoder<CommonMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, CommonMessage message, ByteBuf out) throws Exception {KryoSerializer.serialize(message, out);ctx.flush();}
}

KryoDecodeHandler:解码处理器,实现了NettyMessageToByteEncoder接口的Handler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;/*** 反序列化的Handler** @author zhangshen* @date 2023/10/25 9:54* @slogan 编码即学习,注释断语义**/
public class KryoDecodeHandler extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object obj = KryoSerializer.deserialize(in);out.add(obj);}
}

3.4 biz包下:业务模拟

下面只有一个类,OrderInfo

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;import java.math.BigDecimal;/*** 订单信息** @author zhangshen* @date 2023/10/25 10:44* @slogan 编码即学习,注释断语义**/
@Getter
@Setter
@ToString
public class OrderInfo {private String orderId;private Integer productCount;private BigDecimal amount;
}

四、业务流程图

在这里插入图片描述

学习总结

  1. 使用Netty写了一个简单的通信示例

感谢

感谢【百度:文心一言】

相关文章:

【Netty专题】用Netty手写一个远程长连接通信框架

目录 前言阅读对象阅读导航前置知识课程内容一、使用Netty实现一个通信框架需要考虑什么问题二、通信框架功能设计2.1 功能描述2.2 通信模型2.3 消息体定义2.4 心跳机制2.5 重连机制*2.6 Handler的组织顺序2.7 交互式调试 三、代码实现&#xff1a;非必要。感兴趣的自行查看3.1…...

注册商标被宣告为无效的5种情形

1.与已注册商标过于相似&#xff1a;商标法规定商标应具备独立性和显著性&#xff0c;能够与已注册商标有效区分开来。如果新申请商标与已注册商标过于相似&#xff0c;可能会导致商标无效。相似性包括外观形状、字母组合、发音或含义等方面的相似度。 2.缺乏独特性和显著性&am…...

C#在类中申明成员数组变量的格式

在C#中&#xff0c;在类中申明成员数组变量的格式如下&#xff1a; 访问修饰符 数据类型[] 变量名; 其中&#xff1a; 访问修饰符表示该成员变量的访问权限&#xff0c;可以是public、private、protected、internal等修饰符之一&#xff1b;数据类型表示数组元素的类型&…...

通俗易懂理解CNN卷积神经网络模型的参数量和计算量

一、参考资料 神经网络参数量、计算量&#xff08;FLOPS&#xff09;、内存访问量&#xff08;MAC&#xff09;计算详解 5种方法获取Torch网络模型参数量计算量等信息 二、参数量与计算量相关介绍 1. 为什么要统计模型参数量和计算量 好的网络模型不仅要求精度准&#xff0…...

npm工具使用方法介绍

npm 使用方法 文章目录 npm 使用方法安装 npm初始化项目安装依赖更新依赖卸载依赖发布包其他命令下载相关 npm 是 Node.js 的包管理工具&#xff0c;用于管理 Node.js 项目的依赖关系。npm 提供了丰富的命令和功能&#xff0c;可以帮助开发者快速构建和部署 Node.js 应用程序。…...

使用Python批量修改PPT字体和提取全部文字到word

目录 一、修改PPT中每一页的字体二、将文本框中的字都放到word里 将一份PPT的每一页字体、大小、是否加粗都统一&#xff0c;是一个常见需求。特别是字体统一是高频、热点需求。在python操控PPT常用库python-pptx中有一个bug&#xff0c;对字体的修改只能修改数字和英文字母&am…...

Debezium系列之:在K8s集群中部署Debezium Operator运行Debezium Server的详细步骤

Debezium系列之:在K8s集群中部署Debezium Operator运行Debezium Server的详细步骤 一、背景二、目标三、准备环境四、运行本地 Kubernetes 集群五、认识K8s集群部署工具kind六、认识Kubernetes Operator六、安装docker七、安装kind八、安装kubectl九、使用kind创建k8s集群十、…...

并行和并发有什么区别?

并行和并发 并行和并发最早其实描述的是 Java 并发编程里面的概念。他们强调的是 CPU 处理任务的能力。简单来说&#xff1a; 并发&#xff0c;就是同一个时刻&#xff0c;CPU 能够处理的任务数量&#xff0c;并且对于应用程序来说&#xff0c;不会出现卡顿现象。并行&#x…...

第2篇 机器学习基础 —(3)机器学习库之Scikit-Learn

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。Scikit-Learn&#xff08;简称Sklearn&#xff09;是Python 的第三方模块&#xff0c;它是机器学习领域当中知名的Python 模块之一&#xff0c;它对常用的机器学习算法进行了封装&#xff0c;包括回归&#xff08;Regressi…...

正点原子嵌入式linux驱动开发——Linux SPI驱动

到目前为止的学习笔记&#xff0c;已经介绍了Linux下的platform总线框架、I2C总线框架&#xff0c;本篇笔记将介绍Linux下的SPI总线框架。与I2C总线一样&#xff0c;SPI是物理总线&#xff0c;也是一种很常用的串行通信协议。本章就来学习如何在Linux下编写SPI总线接口的设备驱…...

【计算机视觉】相机

文章目录 一、原始的相机&#xff1a;针孔相机&#xff08;Pinhole Camera&#xff09;二、针孔相机的数学模型三、真实相机四、透镜的缺陷 我的《计算机视觉》系列参考UC Berkeley的CS180课程&#xff0c;PPT可以在课程主页看到。 成像原理 一、原始的相机&#xff1a;针孔相机…...

Spring的条件注解,一篇文章盘得清清楚楚明明白白

前言 在Spring中&#xff0c;条件注解可根据特定的条件来决定是否创建或配置Bean&#xff0c;这些条件可以基于类、属性、环境等因素。通过使用条件注解&#xff0c;我们可以在Spring容器中更加灵活地管理和控制组件的创建和注入&#xff0c;帮助我们更加灵活地管理和控制Bean…...

Oracle (7)Online Redo Log Files

目录 一、Oracle Online Redo Log Files及其相关内容介绍 1、Online Redo Log Files简介 2、Online Redo Log Files特点 3、Online Redo Log Files文件组 4、多路复用文件 5、联机重做日志文件工作方式 6、LGWR什么时候写重做 7、LS和LSN 8、删除Redo文件成员 9、删除…...

物联网AI MicroPython传感器学习 之 PAJ7620手势识别传感器

学物联网&#xff0c;来万物简单IoT物联网&#xff01;&#xff01; 一、产品简介 手势识别传感器PAJ7620u2是一款集成3D手势识别和运动跟踪为一体的交互式传感器&#xff0c;传感器可以在有效范围内识别手指的顺时针/逆时针转动方向和手指的运动方向等。它可以识别13种手势&a…...

Affinity Photo 2.2.1 高端专业Mac PS修图软件

Affinity Photo Mac中文版是一款面向专业摄影师和其他视觉艺术家的专业图像处理软件&#xff0c;拥有众多专业高端功能&#xff0c;如Raw处理、PSD导入和导出、16位通道的编辑和ICC色彩管理以及兼容大量图片格式。是现在最快、最顺、最精准的专业修图软件。Affinity Photo Mac是…...

微服务-统一网关Gateway

网关的作用 对用户请求做身份认证、权限校验将用户请求路由到微服务&#xff0c;并实现负载均衡对用户请求做限流 搭建网关服务 创建新module&#xff0c;命名为Gateway&#xff0c;引入依赖&#xff08;1.SpringCloudGateway依赖&#xff1b;2.Eureka客户端依赖或者nacos的服…...

【音视频|wav】wav音频文件格式详解

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…...

网络工程综合试题(二)

1. SR技术有哪些缺点&#xff1f; SR&#xff08;Segment Routing&#xff09;技术是一种新兴的网络编程技术&#xff0c;它具有很多优点&#xff0c;但也存在一些缺点&#xff0c;包括&#xff1a; 部署复杂性&#xff1a;SR技术需要对网络进行改造和升级&#xff0c;包括更新…...

Android JNI/NDK 入门从一到二

1. 前言 最基础的创建JNI接口的操作&#xff0c;可以直接看这篇文章 : 第一个Android JNI工程&#xff0c; 本文会基于掌握创建JNI接口的操作的基础之上&#xff0c;来入门JNI/NDK。 2. 在JNI中打印日志 2.1 添加log模块 记得CMake中有log模块&#xff0c;不然编译不过 ta…...

吃瓜教程3|决策树

ID3算法 假定当前样本集合D中第k类样本所占比例为pk&#xff0c;则样本集合D的信息熵定义为 信息增益 C4.5算法 ID3算法存在一个问题&#xff0c;就是偏向于取值数目较多的属性&#xff0c;因此C4.5算法使用了“增益率”&#xff08;gain ratio&#xff09;来选择划分属性 CA…...

springboot动态数据源【非伪数据源】

说明&#xff1a;本文章的数据源不是在配置文件中配置两个或多个数据源&#xff0c;在业务方面对这些数据源来回切换&#xff0c;本文章中的数据源是可以动态添加&#xff0c;修改&#xff0c;切换的&#xff0c;废话不多说。 先看工程图&#xff1a; 1.pom.xml文件 <?x…...

如何改善设备综合效率(OEE)并提高工厂的生产力

在现代制造业中&#xff0c;提高设备综合效率&#xff08;Overall Equipment Efficiency&#xff0c;OEE&#xff09;是企业追求高效生产和优化生产能力的重要目标之一。OEE是一个关键的绩效指标&#xff0c;可以帮助企业评估设备的利用效率、生产效率和质量水平。本文将从三个…...

一文接入Android阿里Sophix热更新

最近公司项目渐趋成熟&#xff0c;已经不需要经常更新版本&#xff0c;并且更新版本对客户的影响特别大&#xff0c;但是日常维护难免需要更新代码&#xff0c;因此热修复的技术&#xff0c;就比较迫切了。 经过一段时间的对比&#xff0c;我们最终决定使用阿里的Sophix方案&am…...

【高阶数据结构】并查集和图

目录 1.数据结构--并查集 2.数据结构--图 1.图的基础概念 2.图的简单实现 2.1.邻接矩阵的图实现 2.2.邻接表的图实现 2.3.图的DFS和BFS 2.4.最小生成树 2.4.1.Kruskal(克鲁斯卡尔算法) 2.4.2.Prim&#xff08;普里姆算法&#xff09; 2.5.最短路径 2.5.1.Dijkstra(…...

Git 提交时提示 GPG 签名错误

本来应该一切都是正常的&#xff0c;但今天提交的时候提示 GPG 签名错误。 错误的信息就是 GPG 签名失败。 gpg: skipped "942395299055675C": No secret key gpg: signing failed: No secret key error: gpg failed to sign the data fatal: failed to write commi…...

vite+vue3实现 tomcat 的本地部署

背景&#xff1a; 很多开发小伙伴在本地开发完前端项目后&#xff0c;碍于服务端环境配置麻烦&#xff0c;想先试试在本地部署&#xff0c;已开发好的前端项目&#xff0c;由于很多文章都是文字性描述&#xff0c;不太直观&#xff0c;为了给大多数新手提供一个教程&#xff0c…...

docker+playwright

windows10 docker playwright 难点在于windows下docker的安装&#xff0c;以及官方hub被墙的困难。 wsl2 wsl2 ubuntu docker git clone https://gitee.com/lineuman/lcs_playwright.git npm install npx playwright test docker端口怎么映射到主机上面&#xff1f; 设置重…...

php框架路由实现

在PHP中也有很多框架&#xff08;如Laravel、CodeIgniter&#xff09;提供了路由功能。下面是一个简单的PHP路由实现原理和示例代码&#xff1a; 路由实现原理&#xff1a; 客户端发起请求&#xff0c;请求的URL会被传递给Web服务器。Web服务器将请求传递给PHP解释器&#xff…...

在CentOS 7中手工打造和运行xml文件配置的Servlet,然后使用curl、浏览器、telnet等三种工具各自测试

下载Openjdk并配置环境变量 https://jdk.java.net/java-se-ri/11-MR2是官网下载Openjdk 11的地方。 sudo wget https://download.java.net/openjdk/jdk11.0.0.1/ri/openjdk-11.0.0.1_linux-x64_bin.tar.gz下载openjdk 11。 sudo mkdir -p /usr/openjdk11创建目录&#xff…...

单例模式.

目录 ♫什么是单例模式 ♫饿汉式单例模式 ♫懒汉式单例模式 ♫单例模式的线程安全问题 ♪原子性 ♪内存可见性与指令重排序 ♫什么是单例模式 单例模式是一种设计模式&#xff0c;通过巧用Java的现有语法&#xff0c;实现一个只能被创建一个实例的类&#xff0c;并提供一个全…...