Netty源码—二、server启动(2) (2)

不难看出channel初始化的过程就是创建了一个socket,接下来看看channel的注册

// config()返回的是ServerBootstrapConfig // group()返回的是parentGroup,对应开始的例子是bossGroup,也就是NioEventLoopGroup // 所以是调用的是NioEventLoopGroup.register,该方法继承自MultithreadEventLoopGroup ChannelFuture regFuture = config().group().register(channel); // io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel) public ChannelFuture register(Channel channel) { // 使用的是bossGroup,next方法选出第一个NioEventLoop,调用NioEventLoop.register,该方法继承自SingleThreadEventLoop return next().register(channel); } // io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel) public ChannelFuture register(Channel channel) { // 注册的还是使用一个promise,可以异步注册 return register(new DefaultChannelPromise(channel, this)); } // io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise) public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); // channel返回的是NioServerSocketChannel // unsafe返回的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe // 所以调用的是NioMessageUnsafe.register,该方法继承自AbstractUnsafe promise.channel().unsafe().register(this, promise); return promise; } // io.netty.channel.AbstractChannel.AbstractUnsafe#register public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 省略中间代码... // 当前线程是main线程,eventLoop是bossGroup中的一个线程,所以这里返回false,会在新线程中执行register0 if (eventLoop.inEventLoop()) { register0(promise); } else { try { // 在eventLoop中执行 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { // 省略中间代码... } } } private void register0(ChannelPromise promise) { try { // 省略中间代码... // 这里面主要是调用ServerSocketChannelImpl.register,注册的过程中主要是将需要监听的文件描述符添加到EPollArrayWrapper中 doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // 省略中间代码... } }

下面看看channel注册过程中做了哪些事情

// sun.nio.ch.SelectorImpl#register // 这里ch是ServerSocketChannelImpl // attachment是NioServerSocketChannel // ops是0,这里并不注册需要监听的事件 // selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); // 创建一个SelectionKeyImpl, SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { // 调用sun.nio.ch.EPollSelectorImpl#implRegister implRegister(k); } // 设置当前channel关注的事件 k.interestOps(ops); return k; } protected void implRegister(SelectionKeyImpl ski) { if (closed) throw new ClosedSelectorException(); SelChImpl ch = ski.channel; int fd = Integer.valueOf(ch.getFDVal()); fdToKey.put(fd, ski); // poolWrapper是epoll监听事件所需数据结构的java版本 // add方法调用setUpdateEvents来指定当前socket监听的事件 pollWrapper.add(fd); keys.add(ski); } /** * struct epoll_event { * __uint32_t events; * epoll_data_t data; * }; * 由于一开始并不知道会监听多少个socket,所以jdk默认指定了MAX_UPDATE_ARRAY_SIZE * 如果小于MAX_UPDATE_ARRAY_SIZE则使用数组eventsLow存储每个socket监听的事件,eventsLow的下标就是socket对应的文件描述符 * 如果大于等于MAX_UPDATE_ARRAY_SIZE个则使用EPollArrayWrapper#eventsHigh,也就是一个map来保存每个socket监听的事件 * * 注意这个时候调用setUpdateEvents的events参数是0,也就是还没有执行监听的事件类型 */ private void setUpdateEvents(int fd, byte events, boolean force) { if (fd < MAX_UPDATE_ARRAY_SIZE) { if ((eventsLow[fd] != KILLED) || force) { eventsLow[fd] = events; } } else { Integer key = Integer.valueOf(fd); if (!isEventsHighKilled(key) || force) { eventsHigh.put(key, Byte.valueOf(events)); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfjxw.html