NioEventLoop启动流程源码解析 (2)

创建/绑定完成了新的线程后,第二波高潮来了, SingleThreadEventExecutor.this.run(); 这行代码的意思是,调用本类的Run()方法,这个Run()方法就是真正在干活的事件循环,但是呢, 在本类中,Run()是一个抽象方法,因此我们要去找他的子类,那么是谁重写的这个Run()呢? 就是NioEventLoop, 它根据自己需求,重写了这个方法

小结: 到现在,NioEventLoop的线程已经开启了,下面的重头戏就是看他是如何进行事件循环的

NioEventLoop的事件循环run()

我们来到了NioEventLoop的run(), 他是个无限for循环, 主要完成了下面三件事

轮询IO事件

处理IO事件

处理非IO任务

这是NioEventLoop的run()的源码,删除了部分注解和收尾工作,

/** * todo select() 检查是否有IO事件 * todo ProcessorSelectedKeys() 处理IO事件 * todo RunAllTask() 处理异步任务队列 */ @Override protected void run() { for (; ; ) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: // todo 轮询IO事件, 等待事件的发生, 本方法下面的代码是处理接受到的感性趣的事件, 进入查看本方法 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; // todo 默认50 // todo 如果ioRatio==100 就调用第一个 processSelectedKeys(); 否则就调用第二个 if (ioRatio == 100) { try { // todo 处理 处理发生的感性趣的事件 processSelectedKeys(); } finally { // Ensure we always run tasks. // todo 用于处理 本 eventLoop外的线程 扔到taskQueue中的任务 runAllTasks(); } } else {// todo 因为ioRatio默认是50 , 所以来else // todo 记录下开始的时间 final long ioStartTime = System.nanoTime(); try { // todo 处理IO事件 processSelectedKeys(); } finally { // Ensure we always run tasks. // todo 根据处理IO事件耗时 ,控制 下面的runAllTasks执行任务不能超过 ioTime 时间 final long ioTime = System.nanoTime() - ioStartTime; // todo 这里面有聚合任务的逻辑 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } } }

下面进入它的select(),我们把select()称作: 基于deadline的任务穿插处理逻辑
下面直接贴出它的源码:下面的代码中我写了一些注解了, 主要是分如下几步走

根据当前时间计算出本次for()的最迟截止时间, 也就是他的deadline

判断1 如果超过了 截止时间,selector.selectNow(); 直接退出

判断2 如果任务队列中出现了新的任务 selector.selectNow(); 直接退出

经过了上面12两次判断后, netty 进行阻塞式select(time) ,默认是1秒这时可会会出现空轮询的Bug

判断3 如果经过阻塞式的轮询之后,出现的感兴趣的事件,或者任务队列又有新任务了,或者定时任务中有新任务了,或者被外部线程唤醒了 都直接退出循环

如果前面都没出问题,最后检验是否出现了JDK空轮询的BUG

// todo 循环接受IO事件 // todo 每次进行 select() 操作时, oldWakenUp被标记为false private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { ///todo ----------------------------------------- 如下部分代码, 是 select()的deadLine及任务穿插处理逻辑----------------------------------------------------- // todo selectCnt这个变量记录了 循环 select的次数 int selectCnt = 0; // todo 记录当前时间 long currentTimeNanos = System.nanoTime(); // todo 计算出估算的截止时间, 意思是, select()操作不能超过selectDeadLineNanos这个时间, 不让它一直耗着,外面也可能有任务等着当前线程处理 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // -------for 循环开始 ------- for (; ; ) { // todo 计算超时时间 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) {// todo 如果超时了 , 并且selectCnt==0 , 就进行非阻塞的 select() , break, 跳出for循环 if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // todo 判断任务队列中时候还有别的任务, 如果有任务的话, 进入代码块, 非阻塞的select() 并且 break; 跳出循环 //todo 通过cas 把线程安全的把 wakenU设置成true表示退出select()方法, 已进入时,我们设置oldWakenUp是false if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } ///todo ----------------------------------------- 如上部分代码, 是 select()的deadLine及任务穿插处理逻辑----------------------------------------------------- ///todo ----------------------------------------- 如下, 是 阻塞式的select() ----------------------------------------------------- // todo 上面设置的超时时间没到,而且任务为空,进行阻塞式的 select() , timeoutMillis 默认1 // todo netty任务,现在可以放心大胆的 阻塞1秒去轮询 channel连接上是否发生的 selector感性的事件 int selectedKeys = selector.select(timeoutMillis); // todo 表示当前已经轮询了SelectCnt次了 selectCnt++; // todo 阻塞完成轮询后,马上进一步判断 只要满足下面的任意一条. 也将退出无限for循环, select() // todo selectedKeys != 0 表示轮询到了事件 // todo oldWakenUp 当前的操作是否需要唤醒 // todo wakenUp.get() 可能被外部线程唤醒 // todo hasTasks() 任务队列中又有新任务了 // todo hasScheduledTasks() 当时定时任务队列里面也有任务 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } ///todo ----------------------------------------- 如上, 是 阻塞式的select() ----------------------------------------------------- if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } // todo 每次执行到这里就说明,已经进行了一次阻塞式操作 ,并且还没有监听到任何感兴趣的事件,也没有新的任务添加到队列, 记录当前的时间 long time = System.nanoTime(); // todo 如果 当前的时间 - 超时时间 >= 开始时间 把 selectCnt设置为1 , 表明已经进行了一次阻塞式操作 // todo 每次for循环都会判断, 当前时间 currentTimeNanos 不能超过预订的超时时间 timeoutMillis // todo 但是,现在的情况是, 虽然已经进行了一次 时长为timeoutMillis时间的阻塞式select了, // todo 然而, 我执行到当前代码的 时间 - 开始的时间 >= 超时的时间 // todo 但是 如果 当前时间- 超时时间< 开始时间, 也就是说,并没有阻塞select, 而是立即返回了, 就表明这是一次空轮询 // todo 而每次轮询 selectCnt ++; 于是有了下面的判断, if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && // todo selectCnt如果大于 512 表示cpu确实在空轮询, 于是rebuild Selector selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); // todo 它的逻辑创建一个新的selectKey , 把老的Selector上面的key注册进这个新的selector上面 , 进入查看 rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. // todo 解决了Select空轮询的bug selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } //// -----------for 循环结束 -------------- if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } // Harmless exception - log anyway } } 什么是Jdk的Selector空轮询

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxwxs.html