Netty源码—三、select

前面channel已经准备好了,可以接收来自客户端的请求了,NioEventLoop作为一个线程池,只有一个线程,但是有一个queue存储了待执行的task,由于只有一个线程,所以run方法是死循环,除非线程池shutdown。

这个run方法的主要作用:

执行selector.select,监听IO事件,并处理IO事件

由于NioEventLoop兼有线程池的功能,执行线程池中任务

// io.netty.channel.nio.NioEventLoop#run protected void run() { // loop,循环处理IO事件或者处理线程池中的task任务 for (;;) { try { // 判断接下来是是执行select还是直接处理IO事件和执行队列中的task // hasTask判断当前线程的queue中是否还有待执行的任务 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: // NioEventLoop默认不会有这种状态 continue; case SelectStrategy.SELECT: // 说明当前queue中没有task待执行 select(wakenUp.getAndSet(false)); // 唤醒epoll_wait if (wakenUp.get()) { selector.wakeup(); } // fall through default: } cancelledKeys = 0; needsToSelectAgain = false; // 这个比例是处理IO事件所需的时间和花费在处理task时间的比例 final int ioRatio = this.ioRatio; if (ioRatio == 100) { // 如果比例是100,表示每次都处理完IO事件后,执行所有的task try { processSelectedKeys(); } finally { // Ensure we always run tasks. // 保证能执行所有的task runAllTasks(); } } else { // 记录处理IO事件开始的时间 final long ioStartTime = System.nanoTime(); try { // 处理IO事件 processSelectedKeys(); } finally { // Ensure we always run tasks. // 当前时间减去处理IO事件开始的时间就是处理IO事件花费的时间 final long ioTime = System.nanoTime() - ioStartTime; // 执行task的时间taskTime就是ioTime * (100 - ioRatio) / ioRatio // 如果taskTime时间到了还有未执行的task,runAllTasks也会返回 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { // 如果已经shutdown则关闭所有资源 if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } // io.netty.channel.DefaultSelectStrategy#calculateStrategy public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { // 如果还有task待执行则先执行selectNow,selectNow是立即返回的,不是阻塞等待 // 如果没有待执行的task则执行select,有可能是阻塞等待IO事件 return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; } // io.netty.channel.nio.NioEventLoop#selectNowSupplier private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { // epoll_wait的参数timeout可以指定超时时间,selectNow传入的参数是0,也就是不超时等待立即返回 return selectNow(); } }; select过程

epoll模型中最重要的一部分来了,Java把epoll_wait封装成了一个selector,可以理解为多路复用选择器,所以在调用selector.select过程中最后都是通过epoll_wait实现的,下面先看看SelectorImpl的两个select方法

public int select(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("Negative timeout"); // timeout = 0,传递给epoll_wait的参数是-1,表示阻塞等待 // timeout > 0,表示超时等待timeout时间后返回 return lockAndDoSelect((timeout == 0) ? -1 : timeout); } // 调用epoll_wait阻塞等待 public int select() throws IOException { return select(0); } // 调用epoll_wait立即返回 public int selectNow() throws IOException { return lockAndDoSelect(0); }

上面三个select方法都调用了lockAndDoSelect,只是timeout参数不同,其实最后就是调用epoll_wait参数不同,epoll_wait有一个timeout参数,表示超时时间

-1:阻塞

0:立即返回,非阻塞

大于0:指定微秒

// sun.nio.ch.EPollSelectorImpl#doSelect protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); // 省略中间代码... // 开始poll,这里的pollWrapper是EPollArrayWrapper pollWrapper.poll(timeout); // 省略中间代码... int numKeysUpdated = updateSelectedKeys(); // 如果epoll_wait是因为wakeup pipe解除阻塞返回 if (pollWrapper.interrupted()) { // Clear the wakeup pipe // 清除中断文件描述符接收到的IO事件 pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); // 读取完管道中的数据 IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; } int poll(long timeout) throws IOException { // 这里会向epoll注册每个socket需要监听的事件 updateRegistrations(); // 调用epollWait,这是一个native方法 updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; } } return updated; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsffyx.html