不识Netty真面目,只缘未读此真经 (8)

io.netty.channel.DefaultChannelPipeline

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this; }

看到上面的3行注释没有,就解释了上面的PendingHandlerCallback从哪里来的。翻译一下就是,在往Pipeline中添加ChannelHandler时,如果Channel还没有注册到EventLoop上,就将当前的AbstractChannelHandlerContext封装到PendingHandlerCallback里去,等着后面触发调用。

回到正题,PendingHandlerCallback.execute()几经周折,会调用ChannelHandler的handlerAdded(),如下所示:

io.netty.channel.AbstractChannelHandlerContext

final void callHandlerAdded() throws Exception { // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates // any pipeline events ctx.handler() will miss them because the state will not allow it. if (setAddComplete()) { handler().handlerAdded(this); } }

那么再回头看看ChannelInitializer

io.netty.channel.ChannelInitializer

/** * {@inheritDoc} If override this method ensure you call super! */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { if (initChannel(ctx)) { removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. initChannel((C) ctx.channel()); return true; } return false; } /** * This method will be called once the {@link Channel} was registered. After the method returns this instance * will be removed from the {@link ChannelPipeline} of the {@link Channel}. * * @param ch the {@link Channel} which was registered. * @throws Exception is thrown if an error occurs. In that case it will be handled by * {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default close * the {@link Channel}. */ protected abstract void initChannel(C ch) throws Exception;

原来,最终会触发initChannel调用,所以上面初始化ServerSocketChannel时重写的initChannel会在这时执行。

p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });

这里的initChannel执行之后,此时ChannelPipeline大致如下:

head --> tail

值得注意的是,此时ServerBootstrapAcceptor暂时并没有被放入ChannelPipeline中,而同样是放到了上面提到的Queue<Runnable> taskQueue队列中,如下:

ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } });

至于ServerBootstrapAcceptor里面干了啥,等到后面再细说。

来,继续。上面讲清楚了doRegister()和pipeline.invokeHandlerAddedIfNeeded(),接下来看pipeline.fireChannelRegistered()。

(3) pipeline.fireChannelRegistered()

作用:触发pipeline中的ChannelInboundHandler的channelRegistered()方法调用

还是往里面简单追一下源码。

io.netty.channel.AbstractChannelHandlerContext

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } private void invokeChannelRegistered() { if (invokeHandler()) { try { // 这里触发了channelRegistered()方法调用 ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { invokeExceptionCaught(t); } } else { fireChannelRegistered(); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfjxs.html