Netty源码解析 -- 服务端启动过程 (2)

AbstractBootstrap#initAndRegister

final ChannelFuture initAndRegister() { Channel channel = null; try { // #1 channel = channelFactory.newChannel(); // #2 init(channel); } catch (Throwable t) { ... } // #3 ChannelFuture regFuture = config().group().register(channel); // #4 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }

#1 构造ServerChannel
AbstractBootstrap#channelFactory是一个ReflectiveChannelFactory对象,他通过反射生成Channel。
ServerBootstrap#channel方法负责构造ReflectiveChannelFactory,并指定具体的ServerChannel类。
(所以我们要通过该方法指定NioServerSocketChannel.class -- new ServerBootstrap().channel(NioServerSocketChannel.class))
#2 初始化ServerChannel,该方法由子类实现
#3 注册Channel到AcceptGroup,注意,config().group()返回AcceptGroup。
#4 如果IO操作发生了异常,需要关闭Channel。

NioServerSocketChannel#构造方法 -> NioServerSocketChannel#newSocket方法

private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a server socket.", e); } }

使用(jvm)SelectorProvider,构造一个(jvm)ServerSocketChannel。
这里完成了NIO网络通信第一步。

ServerBootstrap#init

void init(Channel channel) throws Exception { // #1 ... ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); // #2 p.addLast(new ChannelInitializer<Channel>() { public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { public void run() { // #3 pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }

#1 设置ServerChannel的Option和Attribute属性。
#2 给ServerChannel的ChannelPipeline添加一个ChannelInitializer。
ChannelInitializer是一种特殊的ChannelHandler,initChannel方法负责完成一些Channel初始化工作,该方法的触发可以参考下文的延迟任务。
#3 上一步骤的ChannelInitializer负责给ServerChannel的ChannelPipeline添加一个ServerBootstrapAcceptor,并将SocketChannel的相关配置(childHandler,currentChildHandler,currentChildOptions,currentChildAttrs)交给它,ServerBootstrapAcceptor用于处理Accept事件,文章后面会解析。

AbstractBootstrap#initAndRegister方法#3步骤 -> SingleThreadEventLoop#register ->(通过Channel调用Unsafe)AbstractUnsafe#register

public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; // #1 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { public void run() { register0(promise); } }); } catch (Throwable t) { ... } } }

eventLoop.inEventLoop()判断当前线程是否为EventLoop执行线程。
如果是,直接执行操作 -- 调用register0方法处理。
否则,提交一个任务给EventLoop。
这是Netty中提交异步任务的通用格式,Netty中有大量类似代码。
注意,这里是异步的关键,将当前操作作为一个异步任务,提交给EventLoop处理,而不需要阻塞当前线程。
EventLoop实际上是一个(jvm)EventExecutor,通过execute方法可以给它任务。

AbstractUnsafe#register0

private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // #1 doRegister(); neverRegistered = false; registered = true; // #2 pipeline.invokeHandlerAddedIfNeeded(); // #3 safeSetSuccess(promise); // #4 pipeline.fireChannelRegistered(); // #5 if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // #6 closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }

#1 由子类实现具体注册操作
#2 执行DefaultChannelPipeline中的延迟任务
#3 设置promise状态为Success
#4 触发ChannelPipeline#fireChannelRegistered
#5 如果是首次注册,触发ChannelPipeline#fireChannelActive
isActive()方法判断当前Channel是否活跃
NioSocketChannel中调用SocketChannel#isOpen和SocketChannel#isConnected判断
NioServerSocketChannel中调用SelectableChannel#isOpen和ServerSocket#isBound方法判断
#6 异常处理,关闭Channel,设置promise状态为Failure。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwsjzf.html