io.netty.channel.nio.NioEventLoop
@Override protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 // ... continue; } selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0); // This will run the minimum number of tasks } } finally { // Always handle shutdown even if the loop processing threw an exception. } } }先简单解释一下上面的代码,部分细节后面再扣。run()方法里面是个死循环,大致是这样的,这里的描述并不完全准确,是这么个意思,taskQueue里面如果有task,就不断poll执行队列里的task,具体见runAllTasks();否则,就selector.select(),若有IO事件,则通过processSelectedKeys()来处理。
讲到这里,正好刚才不是往taskQueue里放了个Runnable吗,再贴一下上面那个Runnable的代码
new Runnable() { @Override public void run() { register0(promise); } };于是就要执行Runnable里面register0(promise)了。
io.netty.channel.AbstractChannel
private void register0(ChannelPromise promise) { //(1)把ServerSocketChannel注册到了Selector上 doRegister(); // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. //(2)触发pipeline中的ChannelHandler的handlerAdded()方法调用 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //(3)触发pipeline中的ChannelInboundHandler的channelRegistered()方法调用 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } }上面我按照自己的理解,在代码中加了少许注释,下面按照我注释的顺序依次解释一下。
(1) doRegister()
io.netty.channel.nio.AbstractNioChannel
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } }这里显然是把ServerSocketChannel注册到了Selector上。
(2) pipeline.invokeHandlerAddedIfNeeded()
作用:触发pipeline中的ChannelHandler的handlerAdded()方法调用
io.netty.channel.DefaultChannelPipeline
final void invokeHandlerAddedIfNeeded() { if (firstRegistration) { firstRegistration = false; // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers, // that were added before the registration was done. callHandlerAddedForAllHandlers(); } }上面的注释清晰地告诉我们,现在ServerSocketChannel已经注册到EventLoop上,是时候该调用Pipeline中的ChannelHandlers。到这里,就能与上面初始化ServerSocketChannel对接起来了,猜测应该会触发上面的ChannelInitializer的调用。
io.netty.channel.DefaultChannelPipeline
private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GC'ed. this.pendingHandlerCallbackHead = null; } // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside // the EventLoop. PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { task.execute(); task = task.next; } }这里需要先解释一下为什么又突然冒出来PendingHandlerCallback。是这样的,在addLast(ChannelHandler... handlers)时,实际上调了下面的方法。