Netty源码解析 -- 服务端启动过程 (3)

AbstractUnsafe#doRegister -> AbstractNioChannel#doRegister

protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // #1 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { ... } } }

#1 javaChannel()获取(jvm)SelectableChannel,
eventLoop().unwrappedSelector()获取AcceptGroup维护的Selector(jvm)
这里将(jvm)ServerSocketChannel注册到(jvm)Selector,但还没有注册关注事件Key。
从Netty层面看,将Channel注册到EventLoop中。
注意,这里将当前NioServerSocketChannel作为channle#attachment,后面使用它来判断是否为IO事件。

AbstractUnsafe#register0方法#5步骤 -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive
这里涉及ChannelPipeline的事件传播,后面解析ChannelPipeline时详细说明。

HeadContext#channelActive会调用readIfIsAutoRead方法,判断是否开启autoRead,开启则自动触发read事件处理方法。
HeadContext#readIfIsAutoRead -> DefaultChannelPipeline#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead

protected void doBeginRead() throws Exception { // #1 final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); // #2 if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }

#1 selectionKey是Selector中关注事件集合(由AbstractNioChannel#doRegister方法中生成)
#2 这里注册了关注事件readInterestOp。
那么readInterestOp的值是什么呢? 它在AbstractNioChannel#构造方法中赋值,真正的值来自NioServerSocketChannel构造方法,可以看到,它在ServerChannel中固定为SelectionKey.OP_ACCEPT。
到这里,注册ServerChannel的关注事件OP_ACCEPT。
这里完成NIO网络通信第二步,注册关注事件。

AbstractBootstrap.doBind0 -> AbstractChannel#bind -> DefaultChannelPipeline#bind -> HeadContext#bind -> AbstractUnsafe#bind -> NioServerSocketChannel#doBind

protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }

根据不同JDK版本,调用不同的bind方法。
这里完成了NIO网络通信第三步,分配套接字地址,开始socket监听。

Accept事件处理

下面我们来看一下AcceptGroup中如何处理ServerChannel上监听到的accept事件。

这里涉及EventLoop的相关内容,后面有对应解析文章。
现在直接看Accept事件的处理方法NioMessageUnsafe#read

public void read() { ... try { try { do { // #1 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // #2 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); // #3 pipeline.fireChannelReadComplete(); ... } ... }

#1 调用NioServerSocketChannel#doReadMessages,处理Accept事件。
注意,readBuf是一个List<Object>,用于接收处理结果。

allocHandle.continueReading(),判断是否需要继续执行,这里都是返回false
#2 触发DefaultChannelPipeline#fireChannelRead
#3 触发DefaultChannelPipeline#fireChannelReadComplete

NioServerSocketChannel#doReadMessages

protected int doReadMessages(List<Object> buf) throws Exception { // #1 SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { // #2 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { ... } return 0; }

#1 调用(jvm)ServerSocketChannel#accept方法,生成的(jvm)SocketChannel
#2 使用(jvm)SocketChannel构造NioSocketChannel

前面说过,ServerChannel注册到AcceptGroup时,会给ServerChannel的ChannelPipeline添加一个ServerBootstrapAcceptor,用于处理accept事件。
NioMessageUnsafe#read方法#2步骤 -> DefaultChannelPipeline#fireChannelRead -> ServerBootstrapAcceptor#channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) { // #1 final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { // #2 childGroup.register(child).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwsjzf.html