29.Netty源码之服务端启动:创建EventLoopSelector流程
highlight: arduino-light
源码篇:从 Linux 出发深入剖析服务端启动流程
通过前几章课程的学习,我们已经对 Netty 的技术思想和基本原理有了初步的认识,从今天这节课开始我们将正式进入 Netty 核心源码学习的课程。希望能够通过源码解析的方式让你更加深入理解 Netty 的精髓,如 Netty 的设计思想、工程技巧等,为之后继续深入研究 Netty 打下坚实的基础。
在课程开始之前,我想分享一下关于源码学习的几点经验和建议。
第一,很多同学在开始学习源码时面临的第一个问题就是不知道从何下手,这个时候一定不能对着源码毫无意义地四处翻看。建议你可以通过 Hello World 或者 TestCase 作为源码学习的入口,然后再通过 Debug 断点的方式调试并跑通源码。
第二,阅读源码一定要有全局观。首先要把握源码的主流程,避免刚开始陷入代码细节的死胡同。
第三,源码一定要反复阅读,让自己每一次读都有不同的收获。我们可以通过画图、注释的方式帮助自己更容易理解源码的核心流程,方便后续的复习和回顾。
作为源码解析的第一节课,我们将深入分析 Netty 服务端的启动流程。启动服务的过程中我们可以了解到 Netty 各大核心组件的关系,这将是学习 Netty 源码一个非常好的切入点,让我们一起看看 Netty 的每个零件是如何运转起来的吧。
调试示例代码
位于netty-example工程下的handler工程的 io.netty.example.echo包下
java package io.netty.example.echo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure SSL.git final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { System.out.println("客户端bootstrap.handler()方法中指定的处理器被调用"); ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } // p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } }
java package io.netty.example.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Handler implementation for the echo client. It initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */ public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf firstMessage; /** * Creates a client-side handler. */ public EchoClientHandler() { firstMessage = Unpooled.buffer(EchoClient.SIZE); for (int i = 0; i < firstMessage.capacity(); i ++) { firstMessage.writeByte((byte) i); } firstMessage.writeByte((byte) 98); } @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("EchoClientHandler.channelActive"); ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("客户端EchoClientHandler#channelRead被调用"); System.out.println("客户端收到数据" + msg); System.out.println("客户端发送数据" + msg); ctx.writeAndFlush(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
java package io.netty.example.echo; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.ClientLoggingHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.logging.ServerLoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; /** * Echoes back any received data from a client. */ public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new ServerLoggingHandler(LogLevel.INFO)) //两种设置keepalive风格 .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(NioChannelOption.SO_KEEPALIVE, true) //切换到unpooled的方式之一 .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ClientLoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
java package io.netty.example.echo; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.util.Date; import java.util.concurrent.TimeUnit; /** * Handler implementation for the echo server. */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) { ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { System.out.println(new Date().toString() + ":服务器收到消息"); try { Thread.sleep(2 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8)); System.out.println("channel code=" + ctx.channel().hashCode()); } catch (Exception ex) { System.out.println("发生异常" + ex.getMessage()); } } }, 5, TimeUnit.SECONDS); //ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
java package io.netty.handler.logging; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPromise; import io.netty.util.internal.logging.InternalLogLevel; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump; import static io.netty.util.internal.StringUtil.NEWLINE; /** * A {@link ChannelHandler} that logs all events using a logging framework. * By default, all events are logged at <tt>DEBUG</tt> level. */ @Sharable @SuppressWarnings({ "StringConcatenationInsideStringBufferAppend", "StringBufferReplaceableByString" }) public class ServerLoggingHandler extends ChannelDuplexHandler { private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG; protected final InternalLogger logger; protected final InternalLogLevel internalLevel; private final LogLevel level; /** * Creates a new instance whose logger name is the fully qualified class * name of the instance with hex dump enabled. */ public ServerLoggingHandler() { this(DEFAULT_LEVEL); } /** * Creates a new instance whose logger name is the fully qualified class * name of the instance. * * @param level the log level */ public ServerLoggingHandler(LogLevel level) { if (level == null) { throw new NullPointerException("level"); } logger = InternalLoggerFactory.getInstance(getClass()); this.level = level; internalLevel = level.toInternalLevel(); } /** * Creates a new instance with the specified logger name and with hex dump * enabled. * * @param clazz the class type to generate the logger for */ public ServerLoggingHandler(Class<?> clazz) { this(clazz, DEFAULT_LEVEL); } /** * Creates a new instance with the specified logger name. * * @param clazz the class type to generate the logger for * @param level the log level */ public ServerLoggingHandler(Class<?> clazz, LogLevel level) { if (clazz == null) { throw new NullPointerException("clazz"); } if (level == null) { throw new NullPointerException("level"); } logger = InternalLoggerFactory.getInstance(clazz); this.level = level; internalLevel = level.toInternalLevel(); } /** * Creates a new instance with the specified logger name using the default log level. * * @param name the name of the class to use for the logger */ public ServerLoggingHandler(String name) { this(name, DEFAULT_LEVEL); } /** * Creates a new instance with the specified logger name. * * @param name the name of the class to use for the logger * @param level the log level */ public ServerLoggingHandler(String name, LogLevel level) { if (name == null) { throw new NullPointerException("name"); } if (level == null) { throw new NullPointerException("level"); } logger = InternalLoggerFactory.getInstance(name); this.level = level; internalLevel = level.toInternalLevel(); } /** * Returns the {@link LogLevel} that this handler uses to log */ public LogLevel level() { return level; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "REGISTERED")); } ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "UNREGISTERED")); } ctx.fireChannelUnregistered(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "ACTIVE")); } ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "INACTIVE")); } ctx.fireChannelInactive(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "EXCEPTION", cause), cause); } ctx.fireExceptionCaught(cause); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "USER_EVENT", evt)); } ctx.fireUserEventTriggered(evt); } @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "BIND", localAddress)); } ctx.bind(localAddress, promise); } @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "CONNECT", remoteAddress, localAddress)); } ctx.connect(remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "DISCONNECT")); } ctx.disconnect(promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "CLOSE")); } ctx.close(promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "DEREGISTER")); } ctx.deregister(promise); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "READ COMPLETE")); } ctx.fireChannelReadComplete(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器端方法中指定的ServerLoggingHandler被调用"); if (logger.isEnabled(internalLevel)) { //这里被注释了 // logger.log(internalLevel, format(ctx, "READ", msg)); } ctx.fireChannelRead(msg); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "WRITE", msg)); } ctx.write(msg, promise); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "WRITABILITY CHANGED")); } ctx.fireChannelWritabilityChanged(); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "FLUSH")); } ctx.flush(); } /** * Formats an event and returns the formatted message. * * @param eventName the name of the event */ protected String format(ChannelHandlerContext ctx, String eventName) { String chStr = ctx.channel().toString(); return new StringBuilder(chStr.length() + 1 + eventName.length()) .append(chStr) .append(' ') .append(eventName) .toString(); } /** * Formats an event and returns the formatted message. * * @param eventName the name of the event * @param arg the argument of the event */ protected String format(ChannelHandlerContext ctx, String eventName, Object arg) { if (arg instanceof ByteBuf) { return formatByteBuf(ctx, eventName, (ByteBuf) arg); } else if (arg instanceof ByteBufHolder) { return formatByteBufHolder(ctx, eventName, (ByteBufHolder) arg); } else { return formatSimple(ctx, eventName, arg); } } /** * Formats an event and returns the formatted message. This method is currently only used for formatting * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}. * * @param eventName the name of the event * @param firstArg the first argument of the event * @param secondArg the second argument of the event */ protected String format(ChannelHandlerContext ctx, String eventName, Object firstArg, Object secondArg) { if (secondArg == null) { return formatSimple(ctx, eventName, firstArg); } String chStr = ctx.channel().toString(); String arg1Str = String.valueOf(firstArg); String arg2Str = secondArg.toString(); StringBuilder buf = new StringBuilder( chStr.length() + 1 + eventName.length() + 2 + arg1Str.length() + 2 + arg2Str.length()); buf.append(chStr).append(' ').append(eventName).append(": ").append(arg1Str).append(", ").append(arg2Str); return buf.toString(); } /** * Generates the default log message of the specified event whose argument is a {@link ByteBuf}. */ private static String formatByteBuf(ChannelHandlerContext ctx, String eventName, ByteBuf msg) { String chStr = ctx.channel().toString(); int length = msg.readableBytes(); if (length == 0) { StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 4); buf.append(chStr).append(' ').append(eventName).append(": 0B"); return buf.toString(); } else { int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4; StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 2 + 10 + 1 + 2 + rows * 80); buf.append(chStr).append(' ').append(eventName).append(": ").append(length).append('B').append(NEWLINE); appendPrettyHexDump(buf, msg); return buf.toString(); } } /** * Generates the default log message of the specified event whose argument is a {@link ByteBufHolder}. */ private static String formatByteBufHolder(ChannelHandlerContext ctx, String eventName, ByteBufHolder msg) { String chStr = ctx.channel().toString(); String msgStr = msg.toString(); ByteBuf content = msg.content(); int length = content.readableBytes(); if (length == 0) { StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 2 + msgStr.length() + 4); buf.append(chStr).append(' ').append(eventName).append(", ").append(msgStr).append(", 0B"); return buf.toString(); } else { int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4; StringBuilder buf = new StringBuilder( chStr.length() + 1 + eventName.length() + 2 + msgStr.length() + 2 + 10 + 1 + 2 + rows * 80); buf.append(chStr).append(' ').append(eventName).append(": ") .append(msgStr).append(", ").append(length).append('B').append(NEWLINE); appendPrettyHexDump(buf, content); return buf.toString(); } } /** * Generates the default log message of the specified event whose argument is an arbitrary object. */ private static String formatSimple(ChannelHandlerContext ctx, String eventName, Object msg) { String chStr = ctx.channel().toString(); String msgStr = String.valueOf(msg); StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 2 + msgStr.length()); return buf.append(chStr).append(' ').append(eventName).append(": ").append(msgStr).toString(); } }
java /* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.netty.handler.logging; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPromise; import io.netty.util.internal.logging.InternalLogLevel; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump; import static io.netty.util.internal.StringUtil.NEWLINE; /** * A {@link ChannelHandler} that logs all events using a logging framework. * By default, all events are logged at <tt>DEBUG</tt> level. */ @Sharable @SuppressWarnings({ "StringConcatenationInsideStringBufferAppend", "StringBufferReplaceableByString" }) public class ClientLoggingHandler extends ChannelDuplexHandler { private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG; protected final InternalLogger logger; protected final InternalLogLevel internalLevel; private final LogLevel level; /** * Creates a new instance whose logger name is the fully qualified class * name of the instance with hex dump enabled. */ public ClientLoggingHandler() { this(DEFAULT_LEVEL); } /** * Creates a new instance whose logger name is the fully qualified class * name of the instance. * * @param level the log level */ public ClientLoggingHandler(LogLevel level) { if (level == null) { throw new NullPointerException("level"); } logger = InternalLoggerFactory.getInstance(getClass()); this.level = level; internalLevel = level.toInternalLevel(); } /** * Creates a new instance with the specified logger name and with hex dump * enabled. * * @param clazz the class type to generate the logger for */ public ClientLoggingHandler(Class<?> clazz) { this(clazz, DEFAULT_LEVEL); } /** * Creates a new instance with the specified logger name. * * @param clazz the class type to generate the logger for * @param level the log level */ public ClientLoggingHandler(Class<?> clazz, LogLevel level) { if (clazz == null) { throw new NullPointerException("clazz"); } if (level == null) { throw new NullPointerException("level"); } logger = InternalLoggerFactory.getInstance(clazz); this.level = level; internalLevel = level.toInternalLevel(); } /** * Creates a new instance with the specified logger name using the default log level. * * @param name the name of the class to use for the logger */ public ClientLoggingHandler(String name) { this(name, DEFAULT_LEVEL); } /** * Creates a new instance with the specified logger name. * * @param name the name of the class to use for the logger * @param level the log level */ public ClientLoggingHandler(String name, LogLevel level) { if (name == null) { throw new NullPointerException("name"); } if (level == null) { throw new NullPointerException("level"); } logger = InternalLoggerFactory.getInstance(name); this.level = level; internalLevel = level.toInternalLevel(); } /** * Returns the {@link LogLevel} that this handler uses to log */ public LogLevel level() { return level; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "REGISTERED")); } ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "UNREGISTERED")); } ctx.fireChannelUnregistered(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "ACTIVE")); } ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "INACTIVE")); } ctx.fireChannelInactive(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "EXCEPTION", cause), cause); } ctx.fireExceptionCaught(cause); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "USER_EVENT", evt)); } ctx.fireUserEventTriggered(evt); } @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "BIND", localAddress)); } ctx.bind(localAddress, promise); } @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "CONNECT", remoteAddress, localAddress)); } ctx.connect(remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "DISCONNECT")); } ctx.disconnect(promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "CLOSE")); } ctx.close(promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "DEREGISTER")); } ctx.deregister(promise); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "READ COMPLETE")); } ctx.fireChannelReadComplete(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器端方法中指定的ClientLoggingHandler被调用"); if (logger.isEnabled(internalLevel)) { //这里被注释了 // logger.log(internalLevel, format(ctx, "READ", msg)); } ctx.fireChannelRead(msg); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "WRITE", msg)); } ctx.write(msg, promise); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "WRITABILITY CHANGED")); } ctx.fireChannelWritabilityChanged(); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "FLUSH")); } ctx.flush(); } /** * Formats an event and returns the formatted message. * * @param eventName the name of the event */ protected String format(ChannelHandlerContext ctx, String eventName) { String chStr = ctx.channel().toString(); return new StringBuilder(chStr.length() + 1 + eventName.length()) .append(chStr) .append(' ') .append(eventName) .toString(); } /** * Formats an event and returns the formatted message. * * @param eventName the name of the event * @param arg the argument of the event */ protected String format(ChannelHandlerContext ctx, String eventName, Object arg) { if (arg instanceof ByteBuf) { return formatByteBuf(ctx, eventName, (ByteBuf) arg); } else if (arg instanceof ByteBufHolder) { return formatByteBufHolder(ctx, eventName, (ByteBufHolder) arg); } else { return formatSimple(ctx, eventName, arg); } } /** * Formats an event and returns the formatted message. This method is currently only used for formatting * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}. * * @param eventName the name of the event * @param firstArg the first argument of the event * @param secondArg the second argument of the event */ protected String format(ChannelHandlerContext ctx, String eventName, Object firstArg, Object secondArg) { if (secondArg == null) { return formatSimple(ctx, eventName, firstArg); } String chStr = ctx.channel().toString(); String arg1Str = String.valueOf(firstArg); String arg2Str = secondArg.toString(); StringBuilder buf = new StringBuilder( chStr.length() + 1 + eventName.length() + 2 + arg1Str.length() + 2 + arg2Str.length()); buf.append(chStr).append(' ').append(eventName).append(": ").append(arg1Str).append(", ").append(arg2Str); return buf.toString(); } /** * Generates the default log message of the specified event whose argument is a {@link ByteBuf}. */ private static String formatByteBuf(ChannelHandlerContext ctx, String eventName, ByteBuf msg) { String chStr = ctx.channel().toString(); int length = msg.readableBytes(); if (length == 0) { StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 4); buf.append(chStr).append(' ').append(eventName).append(": 0B"); return buf.toString(); } else { int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4; StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 2 + 10 + 1 + 2 + rows * 80); buf.append(chStr).append(' ').append(eventName).append(": ").append(length).append('B').append(NEWLINE); appendPrettyHexDump(buf, msg); return buf.toString(); } } /** * Generates the default log message of the specified event whose argument is a {@link ByteBufHolder}. */ private static String formatByteBufHolder(ChannelHandlerContext ctx, String eventName, ByteBufHolder msg) { String chStr = ctx.channel().toString(); String msgStr = msg.toString(); ByteBuf content = msg.content(); int length = content.readableBytes(); if (length == 0) { StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 2 + msgStr.length() + 4); buf.append(chStr).append(' ').append(eventName).append(", ").append(msgStr).append(", 0B"); return buf.toString(); } else { int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4; StringBuilder buf = new StringBuilder( chStr.length() + 1 + eventName.length() + 2 + msgStr.length() + 2 + 10 + 1 + 2 + rows * 80); buf.append(chStr).append(' ').append(eventName).append(": ") .append(msgStr).append(", ").append(length).append('B').append(NEWLINE); appendPrettyHexDump(buf, content); return buf.toString(); } } /** * Generates the default log message of the specified event whose argument is an arbitrary object. */ private static String formatSimple(ChannelHandlerContext ctx, String eventName, Object msg) { String chStr = ctx.channel().toString(); String msgStr = String.valueOf(msg); StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 2 + msgStr.length()); return buf.append(chStr).append(' ').append(eventName).append(": ").append(msgStr).toString(); } }
服务器调试示例结果
java 反射创建channelpublic io.netty.channel.socket.nio.NioServerSocketChannel() 创建pipeline 设置非阻塞模式 开始初始化channel 服务器端添加匿名处理器 do Register 注册事件 0 invokeHandlerAddedIfNeeded:添加处理器 invokeHandlerAddedIfNeeded->ChannelInitializer.initChannel(ChannelHandlerContext) 调用服务器端匿名处理器 服务器端添加serverBootstrap.handler()方法中指定的处理器 触发事件:fireChannelRegistered ServerLoggingHandler.channelRegistered 服务器端异步添加ServerBootstrapAcceptor ServerLoggingHandler.bind 触发事件:fireChannelActive ServerLoggingHandler.channelActive doBeginRead注册事件:16 ------------服务器channel初始化完成-------------- 创建pipeline 设置非阻塞模式 触发事件:fireChannelRead 服务器端方法中指定的ServerLoggingHandler被调用 服务器端ServerBootstrapAcceptor被调用 do Register 注册事件 0 invokeHandlerAddedIfNeeded:添加处理器 invokeHandlerAddedIfNeeded->ChannelInitializer.initChannel(ChannelHandlerContext) 触发事件:fireChannelRegistered 11:09:25.118 [nioEventLoopGroup-3-1] INFO i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] REGISTERED 触发事件:fireChannelActive 11:09:25.118 [nioEventLoopGroup-2-1] INFO i.n.h.logging.ServerLoggingHandler - [id: 0xd5ededb0, L:/0:0:0:0:0:0:0:0:8007] READ COMPLETE 11:09:25.127 [nioEventLoopGroup-3-1] INFO i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] ACTIVE doBeginRead注册事件:1 ------------客户端channel初始化完成-------------- 触发事件:fireChannelRead //服务器收到消息 ClientLoggingHandler#channelRead 11:09:25.187 [nioEventLoopGroup-3-1] INFO i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] READ COMPLETE 11:09:25.187 [nioEventLoopGroup-3-1] INFO i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] FLUSH Fri Sep 03 11:09:30 CST 2021:服务器收到消息 11:09:32.220 [nioEventLoopGroup-3-1] INFO i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] WRITE: 29B//服务器响应向客户端写消息 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f 2c 20 e5 ae a2 e6 88 b7 e7 ab af |hello, .........| |00000010| 7e 28 3e 5e cf 89 5e 3c 29 e5 96 b5 34 |~(>^..^<)...4 | +--------+-------------------------------------------------+----------------+ 11:09:32.235 [nioEventLoopGroup-3-1] INFO i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] FLUSH
客户端调试示例结果
java 反射创建channelpublic io.netty.channel.socket.nio.NioSocketChannel() 创建pipeline 设置非阻塞模式 开始初始化channel 客户端channel添加bootstrap.handler()方法指定的处理器 do Register 注册事件 0 invokeHandlerAddedIfNeeded:添加处理器 invokeHandlerAddedIfNeeded->ChannelInitializer.initChannel(ChannelHandlerContext) 客户端bootstrap.handler()方法中指定的处理器被调用 触发事件:fireChannelRegistered 完成连接 finishConnect 触发事件:fireChannelActive EchoClientHandler.channelActive//第一次向服务器发送消息 doBeginRead注册事件:1 ------------channel初始化完成-------------- 触发事件:fireChannelRead//客户端收到消息 客户端EchoClientHandler#channelRead被调用 客户端收到数据PooledUnsafeDirectByteBuf(ridx: 0, widx: 29, cap: 1024) 客户端发送数据PooledUnsafeDirectByteBuf(ridx: 0, widx: 29, cap: 1024)
addLast方法
java @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { //handlers return addLast(null, handlers); }
``` @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); }
for (ChannelHandler h: handlers) {if (h == null) {break;}//handlersaddLast(executor, null, h);}return this;
}
```
```java @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}//新添加的handler也会被执行initChannel方法callHandlerAdded0(newCtx);return this;
}
```
创建EventLoop&Selector流程
java EventLoopGroup workerGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup();
```java public class NioEventLoopGroup extends MultithreadEventLoopGroup { //默认构造方法 public NioEventLoopGroup() { this(0); }
public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null);
}
} ```
1.根据系统获取SelectorProvider
```java public NioEventLoopGroup(int nThreads, Executor executor) { //SelectorProvider.provider() //根据不同的系统创建不同的Selector 或者是说jdk不同 //Linux 下JOK 的下载和安装与Windows 下并没有太大的不同,只是对一些环境的设置稍有不同。 //在windows环境下的是 WindowsSelectorProvider this(nThreads, executor, SelectorProvider.provider()); }
//SelectorProvider.provider()
//1.读取配置根据配置的class获取provider 獲取不到的话分支走到第二步
//2.通过spi获取provider 获取不到到第三步
//3.DefaultSelectorProvider#create创建provider
//根据不同的系统创建不同的Selector 或者是说jdk不同
//Linux 下JOK 的下载和安装与Windows 下并没有太大的不同,只是对一些环境的设置稍有不同。
//在windows环境下的是 WindowsSelectorProvider
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction () { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}
}
// sun.nio.ch.DefaultSelectorProvider.create(); // 不同的系统根据jdk有不同的实现 public static SelectorProvider create() { return new WindowsSelectorProvider(); } ```
2.设置线程池数量,默认cpu数量*2
```java public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); }
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
//默认创建的线程数是 cpu核数 * 2
// DEFAULTEVENTLOOPTHREADS = Math.max(1, SystemPropertyUtil.getInt( // "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); //args是可变参 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULTEVENTLOOPTHREADS : nThreads, executor, args); }
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } ```
3.循环创建NioEventLoop
```java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException (String.format("nThreads: %d (expected: > 0)", nThreads)); }
if (executor == null) {//一个task一个threadexecutor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}//children是MultithreadEventExecutorGroup的属性//初始化数组 长度为线程数量children = new EventExecutor[nThreads];//遍历nThreads 循环创建EventExecutor//EventLoop继承自EventExecutorfor (int i = 0; i < nThreads; i ++) {boolean success = false;try {//EventExecutor//其实是1个 NioEventLoopchildren[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}chooser = chooserFactory.newChooser(children);final FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
```
4.初始化NioEventLoop打开selector
```java
//返回的是EventLoop 实际是 NioEventLoop //io.netty.channel.nio.NioEventLoopGroup#newChild @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }
//构造函数 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}if (strategy == null) {throw new NullPointerException("selectStrategy");}provider = selectorProvider;//通过provider获取selector//selectorTuple是selector的包装类//获取到selectorTuple final SelectorTuple selectorTuple = openSelector();//selectorTuple 获取 selectorselector = selectorTuple.selector;unwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy;
}
```
5.NioEventLoop的run方法
因为它继承自Excutor类 所以还要关注它的run方法。
在 Netty 中 EventLoop 可以理解为 Reactor 线程模型的事件处理引擎,每个 EventLoop 线程都维护一个 Selector 选择器和任务队列 taskQueue。它主要负责处理 I/O 事件、普通任务和定时任务。
Netty 中推荐使用 NioEventLoop 作为实现类,那么 Netty 是如何实现 NioEventLoop 的呢?首先我们来看 NioEventLoop 最核心的 run() 方法源码,本节课我们不会对源码做深入的分析,只是先了解 NioEventLoop 的实现结构。
首先,会根据 hasTasks() 的结果来决定是执行 selectNow() 还是 select(oldWakenUp),这个应该好理解。如果有任务正在等待,那么应该使用无阻塞的 selectNow(),如果没有任务在等待,那么就可以使用带阻塞的 select 操作。 ioRatio 控制 IO 操作所占的时间比重: 如果设置为 100%,那么先执行 IO 操作,然后再执行任务队列中的任务。 如果不是 100%,那么先执行 IO 操作,然后执行 taskQueue 中的任务,但是需要控制执行任务的总时间。也就是说,非 IO 操作可以占用的时间,通过 ioRatio 以及这次 IO 操作耗时计算得出。 我们这里先不要去关心 select(oldWakenUp)、processSelectedKeys() 方法和 runAllTasks(…) 方法的细节,只要先理解它们分别做什么事情就可以了。
回过神来,我们前面在 register 的时候提交了 register 任务给 NioEventLoop,这是 NioEventLoop 接收到的第一个任务,所以这里会实例化 Thread 并且启动,然后进入到 NioEventLoop 中的 run 方法。 当然了,实际情况可能是,Channel 实例被 register 到一个已经启动线程的 NioEventLoop 实例中。
io.netty.channel.nio.NioEventLoop#run
```java @Override //死循环监听、处理事件 protected void run() { for (;;) { try { try { //hasTasks()判断是否有任务 tailTasks 和 taskQueue 是否为空 //如果有任务 返回的是io事件个数 那么直接进入default 什么也不做跳出switch //如果没有任务 返回的是SelectStrategy.SELECT //队列中有任务则调用selectNow返回当前已就绪IO事件的数量,否则继续select switch (selectStrategy.calculateStrategy (selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT:// 轮询 I/O 事件select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;//初始ioRatio是50final int ioRatio = this.ioRatio;//如果ioRatio是100 先处理就绪的ioif (ioRatio == 100) {try {//selector选择事件//判断事件类型//处理io事件processSelectedKeys();} finally {//确保我们总是可以执行任务//执行任务runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {// 处理 I/O 事件processSelectedKeys();} finally {//确保我们总是可以执行任务//但是这次执行任务是带有超时时间的final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}
```
上述源码的结构比较清晰,NioEventLoop 每次循环的处理流程都包含事件轮询 select、事件处理 processSelectedKeys、任务处理 runAllTasks 几个步骤,是典型的 Reactor 线程模型的运行机制。而且 Netty 提供了一个参数 ioRatio,可以调整 I/O 事件处理和任务处理的时间比例。下面我们将着重从事件处理和任务处理两个核心部分出发,详细介绍 Netty EventLoop 的实现原理。
结合 Netty 的整体架构,我们一起看下 EventLoop 的事件流转图,以便更好地理解 Netty EventLoop 的设计原理。NioEventLoop 的事件处理机制采用的是无锁串行化的设计思路。
- BossEventLoopGroup 和 WorkerEventLoopGroup 包含一个或者多个 NioEventLoop。BossEventLoopGroup 负责监听客户端的 Accept 事件,当事件触发时,将事件注册至 WorkerEventLoopGroup 中的一个 NioEventLoop 上。每新建一个 Channel, 只选择一个 NioEventLoop 与其绑定。所以说 Channel 生命周期的所有事件处理都是线程独立的,不同的 NioEventLoop 线程之间不会发生任何交集。
- NioEventLoop 完成数据读取后,会调用绑定的 ChannelPipeline 进行事件传播,ChannelPipeline 也是线程安全的,数据会被传递到 ChannelPipeline 的第一个 ChannelHandler 中。数据处理完成后,将加工完成的数据再传递给下一个 ChannelHandler,整个过程是串行化执行,不会发生线程上下文切换的问题。
NioEventLoop 无锁串行化的设计不仅使系统吞吐量达到最大化,而且降低了用户开发业务逻辑的难度,不需要花太多精力关心线程安全问题。
虽然单线程执行避免了线程切换,但是它的缺陷就是不能执行时间过长的 I/O 操作,一旦某个 I/O 事件发生阻塞,那么后续的所有 I/O 事件都无法执行,甚至造成事件积压。
在使用 Netty 进行程序开发时,我们一定要对 ChannelHandler 的实现逻辑有充分的风险意识。
java @Override public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { //判断是否有任务 //有任务唤醒 return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }
``` /** * 核心思想:没有task要做时,select阻塞1s,如果有task,wakeup去做。 * @param oldWakenUp * @throws IOException */ private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); //按scheduled的task时间来计算select timeout时间。 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();if (nextWakeupTime != normalizedDeadlineNanos) {nextWakeupTime = normalizedDeadlineNanos;}for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) { //已经有定时task需要执行了,或者超过最长等待时间了if (selectCnt == 0) {//非阻塞,没有数据返回0selector.selectNow();selectCnt = 1;}break;}if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}//下面select阻塞中,别人唤醒也可以可以的int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.selector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}
}
```
6.空轮询bug解决
NioEventLoop 线程的可靠性至关重要,一旦 NioEventLoop 发生阻塞或者陷入空轮询,就会导致整个系统不可用。
在 JDK 中, Epoll 的实现是存在漏洞的,即使 Selector 轮询的事件列表为空,NIO 线程一样可以被唤醒,导致 CPU 100% 占用。这就是臭名昭著的 JDK epoll 空轮询的 Bug。
Netty 作为一个高性能、高可靠的网络框架,需要保证 I/O 线程的安全性。那么它是如何解决 JDK epoll 空轮询的 Bug 呢?
实际上 Netty 并没有从根源上解决该问题,而是巧妙地规避了这个问题。
我们抛开其他细枝末节,直接定位到事件轮询 select() 方法中的最后一部分代码,一起看下 Netty 是如何解决 epoll 空轮询的 Bug。
Netty中的解决思路: 对Selector()方法中的阻塞定时 select(timeMIllinois)操作的 次数进行统计,每完成一次select操作进行一次计数,若在循环周期内 发生N次空轮询,如果N值大于BUG阈值(默认为512),就进行空轮询BUG处理。 重建Selector,判断是否是其他线程发起的重建请求,若不是则将原SocketChannel从旧的Selector上去除注册,重新注册到新的 Selector上,并将原来的Selector关闭。 https://blog.csdn.net/qq_41884976/article/details/91913820
select方法分三个部分: //第一部分:超时处理逻辑 //第二部分:定时阻塞select(timeMillins) //第三部分: 解决空轮询 BUG long time = System.nanoTime(); //当前时间 - 循环开始时间 >= 定时select的时间timeoutMillis,说明已经执行过一次阻塞select() if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { //说明发生过一次阻塞式轮询 重置次数 selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 如果空轮询的次数大于空轮询次数阈值 SELECTOR_AUTO_REBUILD_THRESHOLD(512) //1.首先创建一个新的Selecor //2.将旧的Selector上面的键及其一系列的信息放到新的selector上面。 selector = selectRebuildSelector(selectCnt); selectCnt = 1; break; }
Netty 提供了一种检测机制判断线程是否可能陷入空轮询,具体的实现方式如下:
- 每次执行 Select 操作之前记录当前时间 currentTimeNanos。
- time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos,如果事件轮询的持续时间大于等于 timeoutMillis,那么说明是正常的,否则表明阻塞时间并未达到预期,可能触发了空轮询的 Bug。
- Netty 引入了计数变量 selectCnt。在正常情况下,selectCnt 会重置,否则会对 selectCnt 自增计数。当 selectCnt 达到 SELECTORAUTOREBUILD_THRESHOLD(默认512) 阈值时,会触发重建 Selector 对象。
Netty 采用这种方法巧妙地规避了 JDK Bug。异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector 上,重建完成之后异常的 Selector 就可以废弃了。
相关文章:
29.Netty源码之服务端启动:创建EventLoopSelector流程
highlight: arduino-light 源码篇:从 Linux 出发深入剖析服务端启动流程 通过前几章课程的学习,我们已经对 Netty 的技术思想和基本原理有了初步的认识,从今天这节课开始我们将正式进入 Netty 核心源码学习的课程。希望能够通过源码解析的方式…...
Kotllin实现ArrayList的基本功能
前言 上次面试时,手写ArrayList竟然翻车,忘了里面的扩容与缩容的条件,再次实现一次,加深印象 源码讲了什么 实现了List列表和RandomAccess随机访问接口List具有增删改查功能,RandomAccess支持下标访问内部是一个扩容…...
C++的初步介绍,以及C++与C的区别
C和C的区别 C又称C plus plus,且C语言是对C语言的扩充,几乎支持所有的C语言语法;C语言:面向过程的语言(注重问题的解决方法和算法)C:面向对象的语言 (求解的方法)面向对…...
JDK 核心jar之 rt.jar
一、JDK目录展示 二、rt.jar 简介 2.1.JAR释义 在软件领域,JAR文件(Java归档,英语:Java Archive)是一种软件包文件格式,通常用于聚合大量的Java类文件、相关的元数据和资源(文本、图片等&…...
el-form表单验证:只在点击保存时校验(包含select、checkbox、radio)
1、input类型 input类型 在el-input里加入:validate-event"false" <el-form-item label"活动名称" prop"name"><el-input v-model"ruleForm.name" :validate-event"false"></el-input> </el-form-i…...
Golang基本语法(上)
1. 变量与常量 Golang 中的标识符与关键字 标识符 Go语言中标识符由字母数字和_(下划线)组成,并且只能以字母和_开头。 举几个例子:abc, _, _123, a123。 关键字 关键字和保留字都不建议用作变量名: Go语言中有25个关键字。 此…...
jenkins使用
安装插件 maven publish over ssh publish over ssh 会将打包后的jar包,通过ssh推送到指定的服务器上,,在jenkins中设置,推送后脚本,实现自动部署jar包,, 装了这个插件之后,可以在项…...
多线程基础篇(包教包会)
文章目录 一、第一个多线程程序1.Jconsole观察线程2.线程休眠-sleep 二、创建线程三、Thread类及常见方法1. Thread 的常见构造方法2. Thread 的几个常见属性3. 启动线程 - start4. 中断线程5. 等待一个线程 四、线程状态五、线程安全问题(synchronized)(重点&#…...
Android/Java中,各种数据类型之间的互相转换,给出各种实例,附上中文注释
目录 1.字符串(String)转整数(int): 2.整数(int)转字符串(String): 3.字符串(String)转浮点数(float)&…...
机器学习知识点总结:什么是EM(最大期望值算法)
什么是EM(最大期望值算法) 在现实生活中,苹果百分百是苹果,梨百分白是梨。 生活中还有很多事物是概率分布,比如有多少人结了婚,又有多少人有工作, 如果我们想要调查人群中吸大麻者的比例呢?敏感问题很难得…...
漏洞挖掘和安全审计的技巧与策略
文章目录 漏洞挖掘:发现隐藏的弱点1. 源代码审计:2. 黑盒测试:3. 静态分析工具: 安全审计:系统的全面评估1. 渗透测试:2. 代码审计:3. 安全策略审查: 代码示例:SQL注入漏…...
[SpringBoot3]Web服务
五、Web服务 基于浏览器的B/S结构应用十分流行。SpringBoot非常适合Web应用开发,可以使用嵌入式Tomcat、Jetty、Undertow或Netty创建一个自包含的HTTP服务器。一个SpringBoot的Web应用能够自己独立运行,不依赖需要安装的Tomcat、Jetty等。SpringBoot可以…...
构建系统自动化-autoreconf
autoreconf简介 autoreconf是一个GNU Autotools工具集中的一个命令,用于自动重新生成构建系统的配置脚本和相关文件。 Autotools是一组用于自动化构建系统的工具,包括Autoconf、Automake和Libtool。它们通常用于跨平台的软件项目,以便在不同…...
Mysql之InnoDB和MyISAM的区别
InnoDB和MyISAM是MySQL数据库中两种常见的存储引擎,它们在功能和性能方面有一些明显的区别。下面是它们之间的详细解释和说明: 底层数据 存数据的时候,MyISAM是数据和索引分开存储,分为MYD和MYI 而InnoDB是数据即索引࿰…...
Unity 之 Transform.Translate 实现局部坐标系中进行平移操作的方法
文章目录 Translate 默认使用局部坐标也可以转换成世界坐标 Translate 默认使用局部坐标 在Unity中,Transform.Translate是用于在游戏对象的局部坐标系中进行平移操作的方法。这意味着它将游戏对象沿着其自身的轴进行移动,而不是世界坐标轴。这在实现物…...
PostgreSQL Error: sorry, too many clients already
Error PG的默认最大连接数是100. 如果超过100就会报错sorry, too many clients already Find show max_connections; SELECT COUNT(*) from pg_stat_activity; SELECT * FROM pg_stat_activity;Solution 提高最大连接数 ALTER SYSTEM SET max_connections 然后重启pg查看…...
Vue2(路由)
目录 一,路由原理(hash)二,路由安装和使用(vue2)三,路由跳转四,路由的传参和取值五,嵌套路由六,路由守卫最后 一,路由原理(hash&#…...
中介者模式-协调多个对象之间的交互
在深圳租房市场,有着许多的“二房东”,房主委托他们将房子租出去,而租客想要租房的话,也是和“二房东”沟通,租房期间有任何问题,找二房东解决。对于房主来说,委托给“二房东”可太省事了&#…...
Python框架【自定义过滤器、自定义数据替换过滤器 、自定义时间过滤器、选择结构、选择练习、循环结构、循环练习、导入宏方式 】(三)
👏作者简介:大家好,我是爱敲代码的小王,CSDN博客博主,Python小白 📕系列专栏:python入门到实战、Python爬虫开发、Python办公自动化、Python数据分析、Python前后端开发 📧如果文章知识点有错误…...
红黑树遍历与Redis存储
引言 在计算机科学领域,红黑树(Red-Black Tree)是一种自平衡的二叉查找树,它能在O(log n)的时间复杂度内完成插入、删除和查找操作。由于其高效性和可预测性的性能,红黑树在许多领域都得到广泛应用。本文将重点介绍红…...
前端处理图片文件的方法
在项目开发过程中,有一个需求,需要前端对上传的图片进行处理,以字符串的形式传给后端,实现效果如下: 1.上传图片的组件 在该项目中,使用了element plus组件库 <el-uploadv-model:file-list"fileL…...
「Java」《深入解析Java多线程编程利器:CompletableFuture》
《深入解析Java多线程编程利器:CompletableFuture》 一、 引言1. 对多线程编程的需求和挑战的介绍2. 介绍CompletableFuture的作用和优势 二. CompletableFuture简介1. CompletableFuture是Java中提供的一个强大的多线程编程工具2. 与传统的Thread和Runnable相比的优…...
Docker容器与虚拟化技术:容器运行时说明与比较
目录 一、理论 1.容器运行时 2.容器运行时接口 3.容器运行时层级 4.容器运行时比较 5.强隔离容器 二、问题 1.K8S为何难以实现真正的多租户 三、总结 一、理论 1.容器运行时 (1)概念 Container Runtime 是运行于 k8s 集群每个节点中ÿ…...
vue导出文件流获取附件名称并下载(在response.headers里解析filename导出)
导出文件流下载,拦截器统一处理配置 需求以往实现的方法(各自的业务层写方法)现在实现的方法(axios里拦截器统一配置处理)把文章链接复制粘贴给后端,让大佬自己赏阅。 需求 之前实现的导出都是各自的业务层…...
山东省图书馆典藏《乡村振兴战略下传统村落文化旅游设计》鲁图中大许少辉博士八一新书
山东省图书馆《乡村振兴战略下传统村落文化旅游设计》鲁图中大许少辉博士八一新书...
2023-08-19力扣每日一题-水题/位运算解法
链接: 2235. 两整数相加 题意: ab 解: ab 补一个位运算写法,进位是(a&b)<<1,不进位的计算结果为a^b 实际代码: #include<iostream> using namespace std; int sum(int num1, int n…...
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)
Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四) 4.1 概述1)压缩的好处和坏处2)压缩原则 4.2 MR 支持的压缩编码4.3 压缩方式选择4.3.1 Gzip 压缩4.3.2 Bzip2 压缩4.3.3 Lzo 压缩4.3.4 Snappy 压缩4.3.5 压缩…...
LRU淘汰策略执行过程
1 介绍 Redis无论是惰性删除还是定期删除,都可能存在删除不尽的情况,无法删除完全,比如每次删除完过期的 key 还是超过 25%,且这些 key 再也不会被客户端访问。 这样的话,定期删除和堕性删除可能都彻底的清理掉。如果…...
Kotlin 高阶函数详解
高阶函数 在 Kotlin 中,函数是一等公民,高阶函数是 Kotlin 的一大难点,如果高阶函数不懂的话,那么要学习 Kotlin 中的协程、阅读 Kotlin 的源码是非常难的,因为源码中有太多高阶函数了。 高阶函数的定义 高阶函数的…...
DL——week2
要学明白的知识点: np.dot()的作用 两个数组的点积,即对应元素相乘 numpy.dot(a,b,outNone) a: ndarray 数组 b: ndarray 数组 out: ndarray, 可选,用来保存dot()的计算结果 numpy Ndarray对象 N维数组对象ndarray&am…...
自做装逼头像网站/关键词推广优化app
TCP主动关闭连接 appl: close(), --> FIN FIN_WAIT_1 //主动关闭socket方,调用close关闭socket,发FIN <-- ACK FIN_WAIT_2 //对方操作系统的TCP层,给ACK响应。然后给FIN <-- FIN …...
网站建设的合同书/网络营销策略制定
一、前言为了方便小公司没有运维开发人员,利用Jenkin解决了繁琐的打包部署问题。这次我就写了一个Gogs的集成教程,我觉的Gogs私服比较简单,其他的GitLab、svn、GitHub基本上也是一样的,搭建好了,开发人员只需要提交到版…...
网站设计到底做多宽/搭建一个app平台要多少钱
2018年12月更新(12个月后):原始字符串文字(在琥珀色列表中)不会进入JDK 12。看看这里的批评。Java的未来版本可能存在(10个或更多)。从2018年1月开始参见JEPS 8196004 :(“JEP”是“JDK增强计划”)JEP草案:原始字符串文字将一种新的文字(原始字符串文字)…...
哪个网站做脚本/抖音seo关键词优化怎么做
chown (change owner) 更改所有者 查看系统所有用户 这里面都是系统默认的用户 [rootevan-01 ~]# cat /etc/passwd root:x:0:0:root:/root:/bin/bash bin:x:1:1:bin:/bin:/sbin/nologin daemon:x:2:2:daemon:/sbin:/sbin/nologin adm:x:3:4:adm:/var/adm:/sbin/nologin lp:x…...
个人网站需要买服务器吗/个人网站制作源代码
最近使用mediaelementjs做一个iPad上的Html5的video标签的播放器包装. 首先感谢一下mediaelementjs这样的开源项目, 可用度极高, 代码质量明显比我自己写要好多了, 模块化清晰, 许可证很开放(MIT). 开发的过程中遇到了些浏览器兼容问题, 也涉及到一下iPad这样的平板平板设备上…...
网站联系我们模板/青岛做网站推广
java虚拟机默认的编码是unicode指的是内存中的编码是unicode,而进行io(包括网络和硬盘)传输时另外编码,通过 System.getProperty("file.encoding")查看,通常,默认为ansi,不过通过ecli…...