当前位置: 首页 > news >正文

Netty 入门学习

前言

学习Spark源码绕不开通信,Spark通信是基于Netty实现的,所以先简单学习总结一下Netty。

Spark 通信历史

最开始: Akka
Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题
Spark 1.6:支持配置使用 Akka 或者 Netty
Spark 2:完全废弃Akka,全部使用Netty

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。
Spark 借鉴Akka 通过 Netty 实现了类似的简约版的Actor 模型

Netty

Server 主要代码:

// 创建ServerBootstrap实例,服务器启动对象
ServerBootstrap bootstrap = new ServerBootstrap();ChannelFuture channelFuture = bootstrap.bind(8888).sync();
// 等待服务器关闭
channelFuture.channel().closeFuture().sync();

主要是启动 ServerBootstrap、绑定端口、等待关闭。

Client 主要代码:

// 创建Bootstrap实例,客户端启动对象
Bootstrap bootstrap = new Bootstrap();
// 连接服务端
ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();

Server 添加 Handler

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ServerHandler());}
});
bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}
});

这里的 ServerHandler 和 ClientHandler 都是自己实现的类,处理具体的逻辑。
如channelActive 建立连接时发消息给服务器,channelRead 读取数据时调用,处理读取数据的逻辑。给服务器或者客户端发消息可以用 writeAndFlush 方法。

完整代码

地址:https://gitee.com/dongkelun/java-learning/tree/master/netty-learning/src/main/java/com/dkl/java/demo

NettyServer

package com.dkl.java.demo;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) {try {bind();} catch (InterruptedException e) {throw new RuntimeException(e);}}public static void bind() throws InterruptedException {// 创建boss线程组,用于接收连接EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 创建worker线程组,用于处理连接上的I/O操作,含有子线程NioEventGroup个数为CPU核数大小的2倍EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 创建ServerBootstrap实例,服务器启动对象ServerBootstrap bootstrap = new ServerBootstrap();// 使用链式编程配置参数// 将boss线程组和worker线程组暂存到ServerBootstrapbootstrap.group(bossGroup, workerGroup);// 设置服务端Channel类型为NioServerSocketChannel作为通道实现bootstrap.channel(NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 添加ServerHandler到ChannelPipeline,对workerGroup的SocketChannel(客户端)设置处理器socketChannel.pipeline().addLast(new ServerHandler());}});// 设置启动参数,初始化服务器连接队列大小。服务端处理客户端连接请求是顺序处理,一个时间内只能处理一个客户端请求// 当有多个客户端同时来请求时,未处理的请求先放入队列中bootstrap.option(ChannelOption.SO_BACKLOG, 1024);// 绑定端口并启动服务器,bind方法是异步的,sync方法是等待异步操作执行完成,返回ChannelFuture异步对象ChannelFuture channelFuture = bootstrap.bind(8888).sync();// 等待服务器关闭channelFuture.channel().closeFuture().sync();} finally {// 优雅地关闭boss线程组bossGroup.shutdownGracefully();// 优雅地关闭worker线程组workerGroup.shutdownGracefully();}}
}

ServerHandler

package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ServerHandler extends ChannelInboundHandlerAdapter {/*** 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用** @param ctx* @throws Exception*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelRegistered");}/*** 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调* 用** @param ctx* @throws Exception*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelUnregistered");}/*** 当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelActive");}/*** 当 Channel 离开活动状态并且不再连接它的远程节点时被调用** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelInactive");}/*** 当从 Channel 读取数据时被调用** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("执行 channelRead");// 处理接收到的数据ByteBuf byteBuf = (ByteBuf) msg;try {// 将接收到的字节数据转换为字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("Server端收到客户消息: " + message);// 发送响应消息给客户端ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端,我收到你的消息啦~", CharsetUtil.UTF_8));} finally {// 释放ByteBuf资源ReferenceCountUtil.release(byteBuf);}}/*** 当 Channel 上的一个读操作完成时被调用,对通道的读取完成的事件或通知。当读取完成可通知发送方或其他的相关方,告诉他们接受方读取完成** @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelReadComplete");}/*** 当 ChannelnboundHandler.fireUserEventTriggered()方法被调用时被* 调用** @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {System.out.println("执行 userEventTriggered");}/*** 当 Channel 的可写状态发生改变时被调用。可以通过调用 Channel 的 isWritable()方法* * 来检测 Channel 的可写性。与可写性相关的阈值可以通过* * Channel.config().setWriteHighWaterMark()和 Channel.config().setWriteLowWaterMark()方法来* * 设置** @param ctx* @throws Exception*/@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelWritabilityChanged");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("执行 exceptionCaught");// 异常处理cause.printStackTrace();ctx.close();}
}

NettyClient

package com.dkl.java.demo;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) {start();}public static void start() {// 创建EventLoopGroup,用于处理客户端的I/O操作EventLoopGroup groupThread = new NioEventLoopGroup();try {// 创建Bootstrap实例,客户端启动对象Bootstrap bootstrap = new Bootstrap();bootstrap.group(groupThread);// 设置服务端Channel类型为NioSocketChannel作为通道实现bootstrap.channel(NioSocketChannel.class);// 设置客户端处理bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}});// 绑定端口ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// 优雅地关闭线程groupThread.shutdownGracefully();}}
}

ClientHandler

package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 连接建立时的处理,发送请求消息给服务器ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端!我是客户端,测试通道连接", CharsetUtil.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 处理接收到的数据ByteBuf byteBuf = (ByteBuf) msg;try {// 将接收到的字节数据转换为字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("受到服务端响应的消息: " + message);// TODO: 对数据进行业务处理} finally {// 释放ByteBuf资源ReferenceCountUtil.release(byteBuf);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 异常处理cause.printStackTrace();ctx.close();}
}

运行截图


handler 执行顺序

Server 端

连接时:执行 channelRegistered
执行 channelActive
执行 channelRead
执行 channelReadComplete断开连接时:执行 channelReadComplete
(强制中断 Client 连接
执行 exceptionCaught
执行 userEventTriggered (exceptionCaught 中 ctx.close()) 触发
)
执行 channelInactive
执行 channelUnregisteredchannelReadComplete 中 ctx.close(); 触发:
执行 channelInactive
执行 channelUnregistered

Client 端

执行 channelRegistered
执行 channelActive
执行 channelRead
执行 channelReadComplete

Spark 对应位置

  • Spark版本:3.2.3
  • Server: org.apache.spark.network.server.TransportServer.init
  • Client: org.apache.spark.network.client.TransportClientFactory.createClient


相关文章:

Netty 入门学习

前言 学习Spark源码绕不开通信&#xff0c;Spark通信是基于Netty实现的&#xff0c;所以先简单学习总结一下Netty。 Spark 通信历史 最开始: Akka Spark 1.3&#xff1a; 开始引入Netty&#xff0c;为了解决大块数据&#xff08;如Shuffle&#xff09;的传输问题 Spark 1.6&…...

Magentic-One、AutoGen、LangGraph、CrewAI 或 OpenAI Swarm:哪种多 AI 代理框架最好?

目录 一、说明 二、 AutoGen-自动生成&#xff08;微软&#xff09; 2.1 特征 2.2 局限性 三、 CrewAI 3.1 特征 3.2 限制&#xff1a; 四、LangGraph 4.1 特征&#xff1a; 4.2 限制&#xff1a; 五、OpenAI Swarm 5.1 特征 5.2 限制 六、Magentic-One 6.1 特征 6.2 限制 七、…...

openstack下如何生成centos9 centos10 和Ubuntu24 镜像

如何生成一个centos 10和centos 9 的镜像1. 下载 对应的版本 wget https://cloud.centos.org/centos/10-stream/x86_64/images/CentOS-Stream-GenericCloud-x86_64-10-latest.x86_64.qcow2 wget https://cloud.centos.org/centos/9-stream/x86_64/images/CentOS-Stream-Gener…...

Kivy App开发之UX控件Slider滑块

在app中可能会调节如音量,亮度等,可以使用Slider来实现,该控件调用方便,兼容性好,滑动平稳。在一些参数设置中,也可以用来调整数值。 支持水平和垂直方向,可以设置默认值,最小及最大值。 使用方法,需用引入Slider类,通过Slider类生成一个滑块并设置相关的样式后,再…...

CSS——22.静态伪类(伪类是选择不同元素状态)

<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>静态伪类</title> </head><body><a href"#">我爱学习</a></body> </html>单击链接前的样式 左键单击&#xff08;且…...

python学opencv|读取图像(三十)使用cv2.getAffineTransform()函数倾斜拉伸图像

【1】引言 前序已经学习了如何平移和旋转缩放图像&#xff0c;相关文章链接为&#xff1a; python学opencv|读取图像&#xff08;二十七&#xff09;使用cv2.warpAffine&#xff08;&#xff09;函数平移图像-CSDN博客 python学opencv|读取图像&#xff08;二十八&#xff0…...

Unity3D中基于ILRuntime的组件化开发详解

前言 在Unity3D开发中&#xff0c;组件化开发是一种高效且灵活的软件架构方式。通过将游戏功能拆分为独立的、可重用的组件&#xff0c;开发者可以更容易地管理、扩展和维护代码。而ILRuntime作为一款基于C#的热更新框架&#xff0c;为Unity3D开发者提供了一种高效的热更新和组…...

ELK的搭建

ELK elk&#xff1a;elasticsearch logstatsh kibana统一日志收集系统 elasticsearch&#xff1a;分布式的全文索引引擎点非关系型数据库,存储所有的日志信息&#xff0c;主和从&#xff0c;最少需要2台 logstatsh&#xff1a;动态的从各种指定的数据源&#xff0c;获取数据…...

国产信创实践(国能磐石服务器操作系统CEOS +东方通TongHttpServer)

替换介绍&#xff1a; 国能磐石服务器操作系统CEOS 对标 Linux 服务器操作系统&#xff08;Ubuntu, CentOS&#xff09; 东方通TongHttpServer 对标 Nginx 负载均衡Web服务器 第一步&#xff1a; 服务器安装CEOS映像文件&#xff0c;可直接安装&#xff0c;本文采用使用VMware …...

C#里使用libxl读取EXCEL文件里的图片并保存出来

有时候需要读取EXCEL里的图片文件, 因为很多用户喜欢使用图片保存在EXCEL里,比如用户保存一些现场整改的图片。 如果需要把这些图片抽取出来,再保存到系统里,就需要读取这些图片数据,生成合适的文件再保存。 在libxl里也提供了这样的方法, 如下: var picType = boo…...

【开源免费】基于SpringBoot+Vue.JS企业级工位管理系统(JAVA毕业设计)

本文项目编号 T 127 &#xff0c;文末自助获取源码 \color{red}{T127&#xff0c;文末自助获取源码} T127&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...

美国大学的计算机科学专业排名

美国的计算机科学专业在全球范围内享有盛誉&#xff0c;许多大学在该领域具有卓越的教学和研究实力。以下是根据最新的排名和信息整理的美国计算机科学专业顶尖大学列表&#xff1a; 2025年 U.S. News 美国本科计算机科学专业排名&#xff1a; 斯坦福大学&#xff08;Stanfor…...

机器学习实战——决策树:从原理到应用的深度解析

✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ ​​​ ​​​ ​​ 决策树&#xff08;Decision Tree&#xff09;是一种简单而直观的分类与回归模型&#xff0c;在机器学习中广泛应用。它的…...

开源生成式物理引擎Genesis,可模拟世界万物

这是生成大模型时代 —— 它们能生成文本、图像、音频、视频、3D 对象…… 而如果将所有这些组合到一起&#xff0c;我们可能会得到一个世界&#xff01; 现在&#xff0c;不管是 LeCun 正在探索的世界模型&#xff0c;还是李飞飞想要攻克的空间智能&#xff0c;又或是其他研究…...

kubernetes第七天

1.影响pod调度的因素 nodeName 节点名 resources 资源限制 hostNetwork 宿主机网络 污点 污点容忍 Pod亲和性 Pod反亲和性 节点亲和性 2.污点 通常是作用于worker节点上&#xff0c;其可以影响pod的调度 语法&#xff1a;key[value]:effect effect:[ɪˈfek…...

RK3588上CPU和GPU算力以及opencv resize的性能对比测试

RK3588上CPU和GPU算力以及opencv resize的性能对比测试 一.背景二.小结三.相关链接四.操作步骤1.环境搭建A.安装依赖B.设置GPU为高性能模式C.获取GPU信息D.获取CPU信息 2.调用OpenCL SDK获取GPU信息3.使用OpenCL API计算矩阵乘4.使用clpeak测试GPU的性能5.使用OpenBLAS测试CPU的…...

基于Centos 7系统的安全加固方案

创作不易&#xff0c;麻烦点个免费的赞和关注吧&#xff01; 声明&#xff01; 免责声明&#xff1a;本教程作者及相关参与人员对于任何直接或间接使用本教程内容而导致的任何形式的损失或损害&#xff0c;包括但不限于数据丢失、系统损坏、个人隐私泄露或经济损失等&#xf…...

IT行业的发展趋势

一、引言 IT&#xff08;信息技术&#xff09;行业自诞生以来&#xff0c;就以惊人的速度发展&#xff0c;不断改变着我们的生活、工作和社会结构。如今&#xff0c;随着技术的持续创新、市场需求的演变以及全球经济格局的变化&#xff0c;IT行业正迈向新的发展阶段&#xff0…...

《探秘开源多模态神经网络模型:AI 新时代的万能钥匙》

《探秘开源多模态神经网络模型&#xff1a;AI 新时代的万能钥匙》 一、多模态模型的崛起之路&#xff08;一&#xff09;从单一到多元&#xff1a;模态的融合演进&#xff08;二&#xff09;关键技术突破&#xff1a;解锁多模态潜能 二、开源多模态模型深度剖析&#xff08;一&…...

ROS核心概念解析:从Node到Master,再到roslaunch的全面指南

Node 在ROS中&#xff0c;最小的进程单元就是节点&#xff08;node&#xff09;。一个软件包里可以有多个可执行文件&#xff0c;可执行文件在运行之后就成了一个进程(process)&#xff0c;这个进程在ROS中就叫做节点。 从程序角度来说&#xff0c;node就是一个可执行文件&…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?

Golang 面试经典题&#xff1a;map 的 key 可以是什么类型&#xff1f;哪些不可以&#xff1f; 在 Golang 的面试中&#xff0c;map 类型的使用是一个常见的考点&#xff0c;其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...

AGain DB和倍数增益的关系

我在设置一款索尼CMOS芯片时&#xff0c;Again增益0db变化为6DB&#xff0c;画面的变化只有2倍DN的增益&#xff0c;比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析&#xff1a; 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...

省略号和可变参数模板

本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...

windows系统MySQL安装文档

概览&#xff1a;本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容&#xff0c;为学习者提供全面的操作指导。关键要点包括&#xff1a; 解压 &#xff1a;下载完成后解压压缩包&#xff0c;得到MySQL 8.…...

Python 训练营打卡 Day 47

注意力热力图可视化 在day 46代码的基础上&#xff0c;对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...

【UE5 C++】通过文件对话框获取选择文件的路径

目录 效果 步骤 源码 效果 步骤 1. 在“xxx.Build.cs”中添加需要使用的模块 &#xff0c;这里主要使用“DesktopPlatform”模块 2. 添加后闭UE编辑器&#xff0c;右键点击 .uproject 文件&#xff0c;选择 "Generate Visual Studio project files"&#xff0c;重…...

人工智能 - 在Dify、Coze、n8n、FastGPT和RAGFlow之间做出技术选型

在Dify、Coze、n8n、FastGPT和RAGFlow之间做出技术选型。这些平台各有侧重&#xff0c;适用场景差异显著。下面我将从核心功能定位、典型应用场景、真实体验痛点、选型决策关键点进行拆解&#xff0c;并提供具体场景下的推荐方案。 一、核心功能定位速览 平台核心定位技术栈亮…...