当selector轮询到 有channel出现了自己的感兴趣的事件时, 需要从成百上千的channel精确的匹配出 出现Io事件的channel,于是seleor就在这里提前把channel存放入 attachment中, 后来使用
最后一个 this 参数, 如果是服务启动时, 他就是NioServerSocketChannel 如果是客户端他就是 NioSocketChannel
ok, 现在就捋清楚了,挖坑,填坑的过程; 下面进入processSelectedKey(SelectionKey k, AbstractNioChannel ch)执行IO任务, 源码如下: 我们可以看到,具体的处理IO的任务都是用Channel的内部类unSafe()完成的, 到这里就不往下跟进了, 后续写新博客连载
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // todo 这个unsafe 也是可channel 也是和Channel进行唯一绑定的对象 final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // todo 确保Key的合法 final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { // todo 确保多线程下的安全性 return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } // todo NioServerSocketChannel和selectKey都合法的话, 就进入下面的 处理阶段 try { // todo 获取SelectedKey 的 关心的选项 int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. // todo 在read() write()之前我们需要调用 finishConnect() 方法, 否则 NIO JDK抛出异常 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps( ); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // todo 同样是检查 readOps是否为零, 来检查是否出现了 jdk 空轮询的bug if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } 处理非IO任务上面的处理IO事件结束后,第三波高潮就来了,处理任务队列中的任务, runAllTask(timeOutMinils), 他也是有生命时长限制的 deadline, 它主要完成了如下的几步:
聚合任务, 把到期的定时任务转移到普通任务队列
循环从普通队列获取任务
执行任务
每执行完64个任务,判断是否到期了
收尾工作
源码如下:
protected boolean runAllTasks(long timeoutNanos) { // todo 聚合任务, 会把定时任务放入普通的任务队列中 进入查看 fetchFromScheduledTaskQueue(); // todo 从普通的队列中拿出一个任务 Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } // todo 计算截止时间, 表示任务的执行,最好别超过这个时间 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; // todo for循环执行任务 for (;;) { // todo 执行任务, 方法里调用 task.run(); safeExecute(task); runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. // todo 因为 nanoTime();的执行也是个相对耗时的操作,因此没执行完64个任务后,检查有没有超时 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } // todo 拿新的任务 task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } // todo 每个任务执行结束都有个收尾的构造 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; } NioEventLoop如何聚合任务?