Netty中ServerBootstrap类介绍
一、Netty基本介绍
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty 在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
本文主要介绍Netty中的核心类之一的:ServerBootstrap类。
ServerBootstrap是服务端的启动配置类,主要用于绑定我们创建的 EventLoopGroup,指定 Channel 的类型以及绑定 Channel 处理器等操作,主要做的都是给属性赋值操作,所以称其为配置类。Bootstrap 则是客户端的启动配置类。
二、继承体系
三、ServerBootstrap类
我们先看下代码入口:
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(parentGroup,childGroup) // 指定eventLoopGroup.channel(NioServerSocketChannel.class) // 指定使用NIO进行通信.childHandler(new SimpleChannelInitializer())// 指定childGroup中的eventLoop所绑定的线程所要处理的处理器.attr(AttributeKey.newInstance("parentAttr"),1).childAttr(AttributeKey.newInstance("childAttr"),2).option(ChannelOption.valueOf("parentOp"),3).childOption(ChannelOption.valueOf("childOp"),4);// 指定当前服务器所监听的端口号// bind()方法的执行是异步的// sync()方法会使bind()操作与后续的代码的执行由异步变为了同步ChannelFuture future = bootstrap.bind(5055).sync();// 关闭Channel// closeFuture()的执行是异步的。// 当Channel调用了close()方法并关闭成功后才会触发closeFuture()方法的执行future.channel().closeFuture().sync();
} catch (Exception e) {e.printStackTrace();
} finally {parentGroup.shutdownGracefully();childGroup.shutdownGracefully();
}
ServerBootstrap初始化调用了无参构造器,并没有具体逻辑,我们看下group()方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;
}
这里主要是做一些属性填充的工作。 channel()方法和childHandler()方法也是类似的。
直接从bootstrap.bind()方法进去:
// io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));
}// 继续跟进
public ChannelFuture bind(SocketAddress localAddress) {// 验证group与channelFactory是否为nullvalidate(); if (localAddress == null) {throw new NullPointerException("localAddress");}return doBind(localAddress);
}
跟进去doBind()方法
private ChannelFuture doBind(final SocketAddress localAddress) {// 创建、初始化channel,并将其注册到selector,返回一个异步结果final ChannelFuture regFuture = initAndRegister();// 从异步结果中获取channelfinal Channel channel = regFuture.channel();// 若异步操作执行过程中出现了异常,则直接返回异步对象(直接结束)if (regFuture.cause() != null) {return regFuture;}// 处理异步操作完成的情况(可能是正常结束,或发生异常,或任务取消,这些情况都属于有结果的情况)if (regFuture.isDone()) {ChannelPromise promise = channel.newPromise();// 绑定指定的端口doBind0(regFuture, channel, localAddress, promise);return promise;} else { // 处理异步操作尚未有结果的情况final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 为异步操作添加监听regFuture.addListener(new ChannelFutureListener() {// 若异步操作具有了结果(即完成),则触发该方法的执行@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) { // 异步操作执行过程中出现了问题promise.setFailure(cause);} else { // 异步操作正常结果promise.registered();// 绑定指定的端口doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}
这里重点关注两个方法:initAndRegister()和doBind0(),在此之前,先了解下ChannelPromise 与 ChannelFuture
1.ChannelPromise 与 ChannelFuture
ChannelFuture 只可以查询当前异步操作的结果,不可以修改当前异步结果的 Future。ChannelPromise 可以修改当前异步结果的状态,并且在修改状态是会触发监听器。在doBind()方法中主要用于在处理异步执行一直未结束的的操作,将异步结果存在异常的时,将异常赋值给 ChannelPromise 并返回。
2.initAndRegister()方法
跟进去initAndRegister()方法,该方法主要是初始化并创建channel
final ChannelFuture initAndRegister() {Channel channel = null;try {// 创建channelchannel = channelFactory.newChannel();// 初始化channelinit(channel);} catch (Throwable t) {if (channel != null) {channel.unsafe().closeForcibly();return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 将channel注册到selectorChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;
}
2.1创建channel
跟进去channelFactory.newChannel()方法
@Override
public T newChannel() {try {return constructor.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);}
}
我们在设置channel类型的时候设置的是NioServerSocketChannel.class,所有我们跟进到NioServerSocketChannel的构造器中
// NIO中的provider,其用于创建selector与channel。并且是单例的
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
先跟进去newSocket,随后跟进去this(ServerSocketChannel )方法
private static ServerSocketChannel newSocket(SelectorProvider provider) {try { // 创建NIO原生的channel => ServerSocketChannelreturn provider.openServerSocketChannel();} catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);}
}
跟进去this(ServerSocketChannel )方法
public NioServerSocketChannel(ServerSocketChannel channel) {// 参数1:父channel// 参数2:NIO原生channel// 参数3:指定当前channel所关注的事件为 接受连接super(null, channel, SelectionKey.OP_ACCEPT);// 用于对channel进行配置的属性集合config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
new NioServerSocketChannelConfig(),就是给当前 Channel 的 config 进行赋值,用来保存当前 Channel 的属性配置的集合
接着跟主线,跟进去super()
// io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);
}// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);// 这里的this.ch为NIO原生channelthis.ch = ch;this.readInterestOp = readInterestOp;try {// NIO,非阻塞ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", e2);}}throw new ChannelException("Failed to enter non-blocking mode.", e);}
}
接着跟进去super()
// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {this.parent = parent;// 为channel生成id,由五部分构成id = newId();// 生成一个底层操作对象unsafeunsafe = newUnsafe();// 创建与这个channel相绑定的channelPipelinepipeline = newChannelPipeline();
}
创建channel主要做了以下的事情:
- 创建一个原生的NioChannel
- 将原生channel设置为非阻塞
- 将readInterestOp设置为SelectionKey.OP_ACCEPT(接受连接)
- 位channel生成一个id
- 生成unsafe对象
- 创建与channel绑定的pileline
2.2初始化channel
跟进去init(Channel)方法
void init(Channel channel) throws Exception {// 获取serverBootstrap中的options属性final Map<ChannelOption<?>, Object> options = options0();// 将options属性设置到channelsynchronized (options) {setChannelOptions(channel, options, logger);}// 获取serverBootstrap中的attrs属性final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {// 遍历attrs属性for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();// 将当前遍历的attr初始化到channelchannel.attr(key).set(e.getValue());}}// 获取channel的pipelineChannelPipeline p = channel.pipeline();// 将serverBootstrap中所有以child开头的属性写入到局部变量,// 然后将它们初始化到childChannel中final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 将ServerBootstrapAcceptor处理器添加到pipeline// ServerBootstrapAcceptor处理器用于接收ServerBootstrap中的属性值,// 我们通常称其为连接处理器pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}
初始化方法先对attr和option进行填充赋值,attr和option的来源是在初始化的时候设置的值
这些值在后续的传递中可以拿到
Attribute<Object> childAttr = channel.attr(AttributeKey.valueOf("childAttr"));
childAttr.get();
Attribute<Object> parentAttr = channel.attr(AttributeKey.valueOf("parentAttr"));
parentAttr.get();
Object childOp = channel.config().getOption(ChannelOption.valueOf("childOp"));
Object parentOp = channel.parent().config().getOption(ChannelOption.valueOf("parentOp"));
这里使用局部变量记录了所有 Child 相关的值 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs 主要用于初始化 childChannel 的属性,new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)) 主要是创建 连接处理器。
p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 将ServerBootstrapAcceptor处理器添加到pipeline// ServerBootstrapAcceptor处理器用于接收ServerBootstrap中的属性值,// 我们通常称其为连接处理器pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}
});
首先这里想做的事情是:将当前 channel 的 pipeline 中绑定一个初始化处理器 ChannelInitializer ,主要的操作是处理 childGroup 里面的 channel 的初始化操作
这里因为 ServerBootstrap 服务端是对用的有两个 EventLoopGroup,在服务端,parentGroup 是用于接收客户端的连接,在 parentGroup 接收到连接之后是将只是将当前转给了 childGroup去处理后续操作,而 childGroup 是用来专门处理连接后的操作的,不关心 channel 的连接任务。这个其实就是 Netty-Server 的 Reactor 线程池模型的处理逻辑。
2.2.1ServerBootstrapAcceptor (childGroup 里面的 channel 的初始化)
进入ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs))方法
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;private final Runnable enableAutoReadTask;ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;// See https://github.com/netty/netty/issues/1328enableAutoReadTask = new Runnable() {@Overridepublic void run() {channel.config().setAutoRead(true);}};}
}
可以看到ServerBootstrapAcceptor继承了ChannelInboundHandlerAdapter类,而构造方法只是将 ServerBootstrap 中配置的 Child 属性设置保存下来。而这里一直说这是连接处理器,是因为当客户端连接发送到服务端时,这个处理器会接收客户端的连接并处理。
主要是处理方法是 channelRead 中的实现:
public void channelRead(ChannelHandlerContext ctx, Object msg) {// msg为客户端发送来的数据,其为NioSocketChannel,即子channel,childChannelfinal Channel child = (Channel) msg;// 将来自于ServerBootstrap的child开头属性初始化到childChannel中(childHandler、childOptions、childAttrs)child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {// 将childChannel注册到selector 需要注意的是,这里的selector与父channel所注册的selector不是同一个childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}
}
这里主要做了两件事:
- 初始化 childChannel
- 将成功从 client 连接过来的 channel 注册到 selector 上。
补充说明:
Server 端的处理上 netty 线程模型采用“服务端监听线程”和“IO线程”分离的方式。所以这里 channelRead 方法就是在 client 端请求连接到 server 端时,用于将当前连接的 IO 线程绑定到 childChannel 同时注册到 ChildGroup 中的 Selector 中。
总结就是,parentGroup处理连接,childGroup处理具体的逻辑,也就是我们添加的到pileline上的各个handler。
2.3将channel注册到selector
ChannelFuture regFuture = config().group().register(channel);
这里的group是parentGroup,我们创建的是NioEventLoopGroup,所以可以找到其对应的register()方法的实现类为MultithreadEventLoopGroup,跟进去:
public ChannelFuture register(Channel channel) {// next() 从eventLoop数组中选择一个eventLoopreturn next().register(channel);
}
根据NioEventLoopGroup的继承体系,进入SingleThreadEventLoop的实现:
public ChannelFuture register(Channel channel) {// 创建一个 ChannelPromise 然后注册return register(new DefaultChannelPromise(channel, this));
}// ----> 这里继续调用 unsafe 的 register
public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;
}
跟进去AbstractChannel的register()方法:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 对异常情况的处理if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}// channel与eventLoop的绑定就发生在这里,// 需要注意,这里的eventLoop还没有绑定线程,因为这个线程还没有创建AbstractChannel.this.eventLoop = eventLoop;// 判断当前线程与eventLoop所绑定线程是否是同一个线程if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 执行当前线程所绑定的eventLoop的execute(), 这个execute()会将参数任务写入到任务队列,并创建启动新的线程eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
跟进去register0(promise)方法
private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister(); // 绑定neverRegistered = false;registered = true;pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}
跟进去AbstractNioChannel的doRegister()方法:
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 在这里进行了注册,将NIO原生channel注册到了NIO原生selectorselectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {eventLoop().selectNow();selected = true;} else {throw e;}}}
}
这里就是 channel 注册 Selector 的代码:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this)
- javaChannel() :这里获取原生的 Nio Channel,跟进去可以找到这里返回的是 AbstractNioChannel#ch 的 channel。在前面 NioEventGroup 初始化的时候说过这个 NIO Channel 的初始化的过程。
- 然后调用 NIO Channel 的 Regsiter 方法
- Regsiter 方法中首先传入的是 unwrappedSelector 前面初始化的 selector 数组。第二个参数 0 ,就是当前监听的的事件, 0 表示不关注任何事件。为什么这里子 Channel 注册的是不关注任何事件? 在前面看到的 Channel 注册一个指定的关注事件:SelectionKey.OP_ACCEPT 连接事件,那个 channel 是 Netty 封装的 channel,哪里监听了连接事件之后,只要关注客户端的连接,当 netty 封装的 channel 获取到连接就绪的 channel 的时候就可以拿到当前 channel 需要注册事件了,然后这个时候就可以指定 原生 NIO channel 的需要关注的事件。所以这里默认不关注任何事件就是为后续修改其需要关注指定类型的就绪事件。
接着看看AbstractChannel的register() 的eventLoop.excute()逻辑
// 在上面register的方法中
eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}
});
跟进去execute()方法:
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}// 判断当前线程与eventLoop所绑定线程是否是同一个boolean inEventLoop = inEventLoop();// 将任务添加到任务队列addTask(task);if (!inEventLoop) {// 创建并启动一个线程startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}
}
这里会执行startThread()方法,跟进去:
private void startThread() {// 若当前eventLoop所绑定线程尚未启动if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {// 创建并启动一个线程doStartThread();} catch (Throwable cause) {STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}
}
首先判断当前 eventLoop 所绑定线程尚未启动,然后使用 CAS 修改当前线程的启动状态 ,修改成功则执行doStartThread()方法,跟进去:
private void doStartThread() {assert thread == null;// 调用NioEventLoop所包含的executor的execute()// 这个execute()会创建并启动一个线程executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {// 执行了一个不会停止的for,用于完成任务队列中的任务SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {// 省略......}});
}
跟进去SingleThreadEventExecutor.this.run()方法:
protected void run() {for (;;) {try {try {// 选择就绪的channelswitch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE: //-2 NioEventLoop不支持continue;case SelectStrategy.BUSY_WAIT: // -3 NioEventLoop不支持case SelectStrategy.SELECT: // -1 能走到这里,说明当前任务队列中没有任务// 进行阻塞式选择select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;// 该变量用于设置“处理就绪channel的IO所使用的时间”与“处理任务队列中任务使用时间”的比例 该值为整型,不大于100final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {runAllTasks();}} else {// 记录处理就绪channel的IO开始执行的时间点final long ioStartTime = System.nanoTime();try {// 处理就绪channel的IOprocessSelectedKeys();} finally {// 计算出处理就绪channel的IO所使用的时长final long ioTime = System.nanoTime() - ioStartTime;// 执行任务队列中的任务runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}// 省略。。。}
}
这里关注4个部分
- selectStrategy.calculateStrategy
- switch-case
- processSelectedKeys()
- runAllTasks()
1.selectStrategy.calculateStrategy()
先看下hasTasks()方法
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());// hasTasks tailTasks 收尾任务队列
protected boolean hasTasks() {return super.hasTasks() || !tailTasks.isEmpty();
}// super.hasTasks() taskQueue 普通任务队列
protected boolean hasTasks() {assert inEventLoop();return !taskQueue.isEmpty();
}
这里返回当前任务队列和收尾队列是否有任务
继续跟进去calculateStrategy()方法:
// SelectStrategy.SELECT = -1
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
IntSupplier 是匿名内部类,跟进去selectSupplier.get()方法
public int get() throws Exception {return selectNow();
}// io.netty.channel.nio.NioEventLoop#selectNow
int selectNow() throws IOException {try {return selector.selectNow();} finally {// restore wakeup state if neededif (wakenUp.get()) {selector.wakeup();}}
}
- selector.selectNow() : 方法为 NIO 的非阻塞选择,返回就绪的 channel 的数量,可以为 0。
- 补充:Selector 的阻塞选择和非阻塞选择的区别就是,非阻塞选则在当前 select 方法执行时判断循环判断所有的 channel 是否就绪并返回所有的就绪数量,而阻塞式选择则是阻塞指定时间直至阻塞时间内获取到就绪 channel 或者阻塞时间超时时立刻返回。
- wakenUp.get() : 返回当前线程是否被阻塞,没有被阻塞时返回 true,当前线程被阻塞返回 false。
- selector.wakeup() :当前线程如果被阻塞,则立刻返回 selector 结果,即唤醒当前线程。
- 这里 selectNow() 方法执行的结果,是一个必然大于等于 0 的结果。
calculateStrategy方法总结:如果任务队列存在任务,则通过 Selector 执行非阻塞选择返回就绪的 channel 数量,如果不存在任务,则直接返回 -1。
2.看下switch-case内容
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {// 省略... case SelectStrategy.SELECT: // -1 能走到这里,说明当前任务队列中没有任务// 进行阻塞式选择select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:
}
当所有任务队列中都没有任务的时候才会返回 -1。也就意味着当任务队列中没有任务时也会景行一次阻塞式选择,通过 wakenUp.getAndSet(false)
方法将当前线程设置为阻塞状态。然后就阻塞式 select。
看下select()方法逻辑:
private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {// 计数器:用于记录空轮询导致CPU占用率飙升,select()提前结束的次数(其值大于1时)int selectCnt = 0;// 获取当前时间,也就是for循环第一次开始执行的时间点long currentTimeNanos = System.nanoTime();// delayNanos() 表示定时任务队列中第一个定时任务还有多久就到开始执行的时间了long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {// 处理小于0.5毫秒的任务long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) { // 该条件为true,表示具有立即需要执行的定时任务if (selectCnt == 0) { // 只有第一次for循环才会执行下面的“非阻塞选择”selector.selectNow();selectCnt = 1;}break;}if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}int selectedKeys = selector.select(timeoutMillis);selectCnt ++;// 若有就绪的channel了,则直接结束if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}// 若当前线程被中断if (Thread.interrupted()) {selectCnt = 1;break;}// 获取当前时间long time = System.nanoTime();// 下面的式子等价于: time - currentTimeNanos >= timeoutMillis// 若下面的条件成立,则说明select()是在指定的阻塞时间过期后才跳出的,即正常结束的if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {selector = selectRebuildSelector(selectCnt); // 重构selectorselectCnt = 1;break;}// 本轮for循环结束时间点,同时也是下一轮for循环的开始时间点currentTimeNanos = time;} } catch (CancelledKeyException e) {}
}
switch-case 唯一的代码逻辑也就是在任务队列中没有任务时执行的阻塞 select,而在其他的任何情况下或者阻塞选择存在就绪 channel 或者任务队列新增任务之后都会跳出 switch - case,执行后续逻辑。
3.processSelectedKeys()
进入processSelectedKeys()方法:
private void processSelectedKeys() {// 判断channel的selectedKeys是否是优化过的if (selectedKeys != null) {processSelectedKeysOptimized(); // 优化处理方式} else {processSelectedKeysPlain(selector.selectedKeys()); // 普通处理方式}
}
优化部分的在《Netty中NioEventLoop介绍》中有讲过了,就是将selectedKeys 的 set 集合转换成了数组。
跟进去processSelectedKeysOptimized()方法:
private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {// 从数组中取出一个元素final SelectionKey k = selectedKeys.keys[i];// 移除已经取出的 SelectionKey,使 GC 可以处理到已经关闭的 channelselectedKeys.keys[i] = null;// 获取selectionKey的附件,该附件中可以存放任意数据,不过这里存放的是NIO原生channelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a); // 处理就绪事件} else {NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task); // 这里是测试代码。跟进去可以看到实现方法是测试类}
// 省略......
跟进去processSelectedKey()方法:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// 处理selectionKey失效的情况if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {return;}if (eventLoop != this || eventLoop == null) {return;}unsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// 判断当前 channnel 就绪的事件类型if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// 获取当前selectionKey的interestOpsint ops = k.interestOps();// 先将SelectionKey.OP_CONNECT按位取或,再与ops进行按位与ops &= ~SelectionKey.OP_CONNECT;// 将修改过的ops再写入到selectionsKey中k.interestOps(ops);// 连接serverunsafe.finishConnect();}// 处理写就绪的情况if ((readyOps & SelectionKey.OP_WRITE) != 0) {// 强制刷新(将user buffer中的数据写入到网关缓存)ch.unsafe().forceFlush();}// readyOps为0表示当前没有任何channel就绪if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// 将网卡缓存中的数据写入到user bufferunsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}
这就是完整的 IO 处理逻辑,主要根据当前 channel 关注的事件进行相应的 unsafe 操作。
4.runAllTasks()
看下runAllTasks:
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);protected boolean runAllTasks(long timeoutNanos) {// 从定时任务队列中取出所有当前马上就要到期的定时任务放入到任务队列fetchFromScheduledTaskQueue();// 从任务队列中取出一个任务Runnable task = pollTask();// 若该任务为空,则说明任务队列中已经没有任务了,此时就可以执行收尾任务了if (task == null) {// 执行收尾队列中的收尾任务afterRunningAllTasks();return false;}final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;// 计数器long runTasks = 0;long lastExecutionTime;for (;;) {// 执行任务safeExecute(task);runTasks ++;// 每64个任务查看一次超时if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}// 从任务队列中再取出一个任务task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}} // end-for// 处理收尾队列中的任务afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;
}
到这里,initAndRegister()方法就介绍完成了,接着往下看
3.doBind0() 绑定端口号
进入doBind0()方法
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) { // 只有当channel初始化注册成功后,才会进行绑定channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}
进入bind()方法
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);
}// 接着进入bind()方法public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);
}
继续进入bind()方法:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {if (localAddress == null) {throw new NullPointerException("localAddress");}if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeBind(localAddress, promise);} else {safeExecute(executor, new Runnable() {@Overridepublic void run() {next.invokeBind(localAddress, promise);}}, promise, null);}return promise;
}
进入invokeBind()方法:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {bind(localAddress, promise);}
}
接着跟进bind()方法,实现为io.netty.channel.DefaultChannelPipeline.HeadContext
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);
}
接着跟进去bind()方法,实现类io.netty.channel.AbstractChannel.AbstractUnsafe
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {// 省略。。。// 获取当前channel是否被激活。注意,现在还没有被激活,所以其值为falseboolean wasActive = isActive();try {// 绑定doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive(); // 触发重写的channelActivate方法的执行}});}safeSetSuccess(promise);
}
进入doBind()方法,实现类NioServerSocketChannel
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}
javaChannel() 即获取 NIO 原生 channel 的方法,再获取到 NIO 原生 channel 之后调用 bind 方法完成绑定。
这里涉及了pileline的一些操作,在这里不展开
总结
这里介绍了ServerBootstarp类实例化,属性设置以及bind端口的一些操作,Bootstrap类的流程和ServerBootstarp类似,但简单一些,后面又时间再整理。
参考博文:博文地址
相关文章:
Netty中ServerBootstrap类介绍
一、Netty基本介绍 Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty 在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。 Netty 是一…...
数字图像处理实验报告
目录 实验二、图像在空间域上的处理方法 实验三、图像在频率域上的处理方法 实验二、图像在空间域上的处理方法 一、实验目的 了解图像亮(灰)度变换与空间滤波的意义和手段;熟悉图像亮(灰)度变换与空间滤波的MATLA…...
【C51】10-基础51单片机的小车项目(51完结)
10.1小车的安装 10.2电机模块的开发(L9110S) 接通 VCC , GND 模块电源指示灯亮, 以下资料来源官方,但是不对,根据下节课实际调试 IA1 输入高电平, IA1 输入低电平,【 OA1 OB1 】电…...
进程、线程、锁阶段总结汇总
目录 进程 线程 锁 由于进程线程和锁的方面比较陌生,并且繁杂,所以简单总结一下学习到的函数API 进程 子进程创建 fork(); 进程结束 exit(); 进程回收 wait(); 进程回收 waitpad(); //函数可以指定进程组中的任意子进程,可以设置特殊…...
Filters.jar图片转素描
链接:https://pan.baidu.com/s/1ATlC2l1I83TPYFomHiWuFg?pwd2vm5 提取码:2vm5...
将MSYS2 MinGW集成到Windows终端
微软开发了一款Windows终端的开源软件,非常好用。安装后在Win7及以上系统会在右键菜单中添加一条“在终端中打开”的命令,非常方便。它默认配置了Windows命令行以及PowerShell,如果安装了Visual Studio 2022还会配置Visual Studio 2022的命令…...
SpringBoot项目使用slf4j的MDC日志打点功能
SpringBoot项目使用slf4j的MDC日志打点功能 物料准备: 1.自定义1个线程MDC打点工具类 2.配置logback打印MDC打点的traceId 3.配置webMVC使用MDC打点 4.配置ThreadPoolTaskExecutor使用MDC打点 5.配置HttpClient使用MDC打点 6.测试MDC日志打点效果 线程mdc打…...
宝塔修改默认端口后面板打不开
1、查看防火墙开启的端口,发现没有开启8888 [rootVM-12-12-centos ~]# firewall-cmd --list-ports 20/tcp 21/tcp 22/tcp 80/tcp 888/tcp 8081/tcp 39000-40000/tcp 8081/udp 2、防火墙开启8888端口 [rootVM-12-12-centos ~]# firewall-cmd --zonepublic --add-por…...
tinkerCAD案例:3.基本按钮
基本按钮 在本课中,您将学习制作具有圆柱形状的基本按钮。 说明 将圆柱体拖动到工作平面。 将其缩小到 2 毫米的高度。 提示: 您可以使用圆柱形状顶部的白点缩小圆柱体。 将其缩小到直径 16 毫米。 这将是按钮的主要形状。 现在我们可以创建允许将纽…...
客户线上反馈:从信息搜集到疑难 bug 排查全流程经验分享
写在前面:本文是我在前端团队的第三次分享,应该很少会有开发者写客户反馈处理流程以及 bug 排查的心得技巧,全文比较长,写了一个多星期大概1W多字(也是我曾经2年工作的总结),如果你有耐心阅读&a…...
悲观锁、乐观锁、自旋锁
悲观锁、乐观锁、自旋锁 (1)乐观锁 乐观锁是一种乐观的思想,即认为读多写少,遇到并发的可能性低,每次拿数据时都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有…...
七、进程地址空间
一、环境变量 (一)概念 环境变量(environment variables):系统当中用做特殊用途的系统变量。 如:我们在编写C/C代码的时候,在链接的时候,从来不知道我们的所链接的动态静态库在哪里,但是照样可…...
浅谈智能微电网供电系统的谐波治理
摘要:智能微电网供电系统的特性容易引发谐波,而谐波导致电力损耗加大,降低供电质量。本文从谐波的产 生原因和危害做出详细阐述,并结合智能微电网提出了治 理谐波的方法和措施。 关键词:智能微电网;谐波危害…...
springboot项目的社区/博客系统
课前导读: 你学完一篇,你就多会一项技能,多多少少对你还是有点帮助的不是吗?~~~ 这是博主网页的url:优文共享社区 开发环境:JDK1.8,IDEA2021,MySQL5.7,Windows11 开发技术…...
go语言基础——函数、结构体、接口
由于go不是一门面向对象的语言,因此在有一些特性上和java是有一些区别的,比如go中就没有类这样的概念。下面来介绍一下go的一些特性。 结构体 结构体类似与java中的类,但又不完全一样。在类中,可以定义字段和方法,但…...
项目集管理—项目集治理
一、概述 项目集治理是实现和执行项目集决策,为支持项目集而制定实践,并维持项目集监督的绩效领域。 本章包括: 项目集治理实践项目集治理角色项目集治理设计与实施 项目集治理包括为了满足组织战略和运营目标的要求,对项目集实…...
MySQL了解之复制(一)
1.1、复制解决的问题 数据复制技术有以下一些特点: (1) 数据分布 (2) 负载平衡(load balancing) (3) 备份 (4) 高可用性(high availability)和容错 1.2、复制如何工作 从高层来看,复制分成三步: (1) master将改变记录到二进制…...
Halcon得出三角形内切圆
Halcon得出三角形内切圆 news2023/5/27 7:14: 目录 一、得出三角形的三个角点二、用类似尺规作图法得出三角形圆心 1、以三角形三角点画出圆形轮廓2、求出三角形轮廓与圆形轮廓之间的交点3、获得角平分线,三边角平分线交点为圆心三、求出圆心到边最短距离即半径 …...
2023年6月北京/广州/深圳CDGA/CDGP数据治理认证招生
DAMA认证为数据管理专业人士提供职业目标晋升规划,彰显了职业发展里程碑及发展阶梯定义,帮助数据管理从业人士获得企业数字化转型战略下的必备职业能力,促进开展工作实践应用及实际问题解决,形成企业所需的新数字经济下的核心职业…...
KMP 算法(Knuth-Morris-Pratt)
tip:作为程序员一定学习编程之道,一定要对代码的编写有追求,不能实现就完事了。我们应该让自己写的代码更加优雅,即使这会费时费力。 推荐:体系化学习Java(Java面试专题) 文章目录 一、什么是 …...
Java泛型详解
泛型的理解 泛型的概念 所谓泛型,就是允许在定义类、接口时通过一个标识表示类中某个属性的类型 或者是 某个方法的返回值类型及参数类型。这个类型参数将在使用时(例如,继承或实现这个接口,用这个类型声明变量、创建对象时&#…...
2023上海国际嵌入式展 | 如何通过人工智能驱动的自动化测试工具提升嵌入式开发效率
2023年6月14日到16日,龙智将在2023上海国际嵌入式展(embedded world China 2023)A055展位亮相。同时,6月14日下午3:00-3:30,龙智资深DevSecOps顾问巫晓光将于创新技术及应用发展论坛第二论坛区(A325展位&am…...
微信小程序个人心得
首先从官方文档给的框架说起,微信小程序官方文档给出了app.js, app.json, app.wxss. 先从这三个文件说起. 复制 app.js 这个文件是整个小程序的入口文件,开发者的逻辑代码在这里面实现,同时在这个文件夹里面可以定义全局变量.app.json 这个文件可以对小程序进行全局配置,决定…...
苹果MacOS系统傻瓜式本地部署AI绘画Stable Diffusion教程
Stable Diffusion的部署对小白来说非常麻烦,特别是又不懂技术的人。今天分享两个一键傻瓜式安装包,对小白来说非常有用。下面两个任选一个安装就可以。 一、DiffusionBee 简单介绍 DiffusionBee是基于stable diffusion的一个安装包,有图形…...
DBA之路-- 闪回恢复区FRA(Flash recovery area)与闪回特性(flashback)[待更新]
闪回恢复区FRA(Flash recovery area)与闪回特性(flashback) 1、闪回特性FB 用于快速简单恢复数据库中出现的认为误操作等逻辑错误 Flashback由undo表空间的撤销段内容为基础,受限于UNDO_RETENTON参数。要使用flashb…...
chatgpt赋能python:Python3.6.5到Python3.7.5:升级指南
Python 3.6.5到Python 3.7.5:升级指南 Python是一种广泛使用的编程语言,拥有强大的库和框架,能够开发各种类型的应用程序。在Python的发行版中,版本更新是常见的过程,以提供更好的性能和新的功能。 本文将介绍如何将…...
Element UI DatePicker 日期选择器
该组件选择周的时候,默认显示‘xxxx年第x周’,但在需求要显示为‘xxxx年x月第x周(mm.dd - mm.dd)’或者‘本周(mm.dd - mm.dd)’,最终效果为 首先需要修改v-model默认展示日期,控件中默认展示为周二&#x…...
sw2urdf导出的urdf文件中的惯性参数(inertial)错误的问题
现象描述 有时候,当我们使用solidworks建好我们的模型,然后利用【sw2urdf】导出后,发现其中的惯性参数,似乎不正确,ixx、izz这些参数都是很接近0的: 资料查找 其实这个不是我们设置的问题,而…...
AICG - Stable Diffusion 学习思考踩坑实录(待续补充)
关于模型 如果模型中没有各种角度的脚和手,无论你再怎么费劲心思,AI 都画不出来,目前C 站也没有什么好脚的例子,正面脚背面脚,但是没有侧面脚,脚这块还是很欠缺,希望未来有大牛能训练出来美脚 …...
LiangGaRy-学习笔记-Day19
1、回顾知识 1.1、文件系统说明 xfs与ext4文件系统 CentOS7以上:默认的就是XFS文件系统 xfs 使用的就是restore、dump等工具 CentOS6默认的就是ext4文件系统 extundelete工具就是用于ext4系统 1.2、回顾Linux文件系统 Linux文件系统是由三个部分组成 inode文…...
wordpress文章调用标签/哪些行业适合做网络推广
http://releases.ubuntu.com/12.04/...
成都网站建设 川icp备/网站软件免费下载
【答案】:C B B B A ①你是公司某项目的项目经理。在项目过程中,你发现公司不具备自主研发某一项可交付成果的能力,并寻求了采购部门的支持。采购部门现在召开一个面向潜在供应商的会议,针对需要采购的内容进行澄清,请…...
做搜狗网站优化排名/谁有恶意点击软件
一 简介 Apache ShardingSphere是一款开源的分布式数据库中间件组成的生态圈二 成员包含 Sharding-JDBC是一款轻量级的Java框架,在JDBC层提供上述核心功能,使用方式与正常的JDBC方式如出一辙,面向Java开发的用户。 Sharding-Proxy是一…...
视频类网站备案/常德seo
算法提高 素数求和 时间限制:1.0s 内存限制:256.0MB问题描述输入一个自然数n,求小于等于n的素数之和样例输入2样例输出2数据规模和约定测试样例保证 2 < n < 2,000,000作者注释:水平有限,此题有些不知所措——…...
高县网站建设/sem代运营
近期,中国联通31省骨干传输网(设备安装)施工集采项目、陕西联通工程建设(设备安装、室分、传输线路)项目、宁夏电信5G网络建设及新增通信工程(设备安装、管线施工)等项目陆续开标。整体来看&…...
河北高端网站建设/青岛网站建设公司排名
Danted/Socks5 代理服务_多IP_多出口的配置_3proxy 转载注明来源: 本文链接 来自osnosn的博客,写于 2020-03-01. danted 的配置文件为 /etc/danted.conf 查看,man danted.conf 中关于 external.rotation 的说明。 比如你有三个网口 eth0, eth1, eth2 让…...