NIO 在Tomcat中的应用(2)

从Poller池中获取一个Poller,将NioChannel注册到Poller上

protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //设置为非阻塞模式,以便通过selector进行查询 socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); //从对象池中获取一个NioChannel,tomcat会复用一切可以复用的对象以减少创建新对象所带来的消耗 NioChannel channel = nioChannels.pop(); if (channel == null) { // 没有获取到,那就新建一个呗 SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); // SSL这一块还没研究 if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); //重新设置SocketBufferHandler,将其设置为可写和可读 channel.reset(); } //从Poller池中获取一个Poller(按照次序获取,可以理解为一个圆环),并将Channel注册到上面 getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; } Poller

从连接注册到Poller说起

不加锁的获取一个Poller

具体说明见代码

关键点:对一个数A取余会将余数的结果限制在A的范围内

/** * Return an available poller in true round robin fashion. * 很明显,取余的方式揭示了获取Poller的方法。你可以理解为 * Poller会组成一个圆环,这样我们就可以通过不断递增获取 * 下一个Poller,但是数据会溢出所以我们要取绝对值 * @return The next poller in sequence */ public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; } channel的注册

该方法会对新的建的连接进行封装,并以PollerEvent的形式注册到相应的Poller中

需要注意的是,真正的注册读事件并不是在此方法注册的(当前方法调用者为Acceptor线程),而是在Poller线程中注册读事件的

/** * Registers a newly created socket with the poller. * 将新建的socket注册到Poller上 * @param socket The newly created socket */ public void register(final NioChannel socket) { //以下代码为设置各种参数,可以从方法名进行推测,不再进行叙述 socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getSocketProperties().getSoTimeout()); ka.setWriteTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); //从缓存中获取一个PollerEvent PollerEvent r = eventCache.pop(); // 注册读事件 ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. // 如果没有从缓存中获取,那么就新建一个 if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); } Poller处理I/O 事件

Poller 处理I/O事件的的代码较长,而且细节也较多,总结其主要作用如下

检测是否有Acceptor提交PollerEvent,如果有则调用PolllerEvent的run方法注册读事件

在执行关键操作的时候检测该Poller是否被关闭如果是,则执行相应的资源释放和关闭操作

调用selector.select() 轮询事件,如果有读事件则交给processKey处理

@Override public void run() { // Loop until destroy() is called // 一直循环直到destroy方法被调用 while (true) { boolean hasEvents = false; try { if (!close) { // events 方法会处理Acceptor注册到Poller中的PollerEvent // 主要是注册读事件 hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } // 检测到关闭,则处理剩余的事件并关闭selector if (close) { // 处理Acceptors注册到Poller中的PollerEvent events(); //selector time out 或者poller被关闭就会调用timeout方法 timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); // 执行 select 操作,查询I/O事件 Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); // 处理检测到的I/O事件 processKey(sk, attachment); } }//while //timeout 会检查是否关闭,如果已经关闭并且有事件未处理会调用cancelledKey方法 //cancelledKey:该方法主要是对和该连接相关的资源执行关闭操作 timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); } processKey 处理I/O事件

processKey主要工作如下

再次检测Poller是否关闭,如果是则释放资源

检测查询到事件是否合法,如果合法则取消已注册到selector上的事件且被被本次轮询所查询到的事件

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/11625.html