Netty-Channel架构体系源码解读 (2)

我们看,AbstractChannel构造函数, 接受的子类传递进来的参数只有一个parent CHannel,而且,还不有可能为空, 所以在AbstractChannel是没有维护jdk底层的Channel的, 相反他会维护着Channel关联的EventLoop,我是怎么知道的呢? 首先,它的属性中存在这个字段,而且,将channel注册进selector的Register()方法是AbastractChannel重写的,Selector在哪呢? 在EventLoop里面,它怎么得到的呢? 它的子类传递了给了它

终于看出来点眉目,构造方法做了四件事

设置parent

如果当前创建的channel是客户端的channel,把parent初始化为他对应的parent

如果为服务端的channel,这就是null

创建唯一的id

创建针对channel进行io读写的unsafe

创建channel的处理器handler链 channelPipeline

AbstractChannel中维护着EventLoop

AbstractChanel的重要抽象内部类AbstractUnsafe 继承了Channel的内部接口Unsafe

他的源码如下,我贴出来了两个重要的方法, 关于这两个方法的解析,我写在代码的下面

protected abstract class AbstractUnsafe implements Unsafe { @Override // todo 入参 eventLoop == SingleThreadEventLoop promise == NioServerSocketChannel + Executor public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // todo 赋值给自己的 事件循环, 把当前的eventLoop赋值给当前的Channel上 作用是标记后续的所有注册的操作都得交给我这个eventLoop处理, 正好对应着下面的判断 // todo 保证了 即便是在多线程的环境下一条channel 也只能注册关联上唯一的eventLoop,唯一的线程 AbstractChannel.this.eventLoop = eventLoop; // todo 下面的分支判断里面执行的代码是一样的!!, 为什么? 这是netty的重点, 它大量的使用线程, 线程之间就会产生同步和并发的问题 // todo 下面的分支,目的就是把线程可能带来的问题降到最低限度 // todo 进入inEventLoop() --> 判断当前执行这行代码的线程是否就是 SingleThreadEventExecutor里面维护的那条唯一的线程 // todo 解释下面分支的必要性, 一个eventLoop可以注册多个channel, 但是channel的整个生命周期中所有的IO事件,仅仅和它关联上的thread有关系 // todo 而且,一个eventLoop在他的整个生命周期中,只和唯一的线程进行绑定, // // todo 当我们注册channel的时候就得确保给他专属它的thread, // todo 如果是新的连接到了, if (eventLoop.inEventLoop()) { // todo 进入regist0() register0(promise); } else { try { // todo 如果不是,它以一个任务的形式提交 事件循环 , 新的任务在新的线程开始, 规避了多线程的并发 // todo 他是SimpleThreadEventExucutor中execute()实现的,把任务添加到执行队列执行 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // todo 进入这个方法doRegister() // todo 它把系统创建的ServerSocketChannel 注册进了选择器 doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // todo 确保在 notify the promise前调用 handlerAdded(...) // todo 这是必需的,因为用户可能已经通过ChannelFutureListener中的管道触发了事件。 // todo 如果需要的话,执行HandlerAdded()方法 // todo 正是这个方法, 回调了前面我们添加 Initializer 中添加 Accpter的重要方法 pipeline.invokeHandlerAddedIfNeeded(); // todo !!!!!!! 观察者模式!!!!!! 通知观察者,谁是观察者? 暂时理解ChannelHandler 是观察者 safeSetSuccess(promise); // todo 传播行为, 传播什么行为呢? 在head---> ServerBootStraptAccptor ---> tail传播事件ChannelRegistered , 也就是挨个调用它们的ChannelRegisted函数 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. // todo 对于服务端: javaChannel().socket().isBound(); 即 当Channel绑定上了端口 isActive()才会返回true // todo 对于客户端的连接 ch.isOpen() && ch.isConnected(); 返回true , 就是说, Channel是open的 打开状态的就是true if (isActive()) { if (firstRegistration) { // todo 在pipeline中传播ChannelActive的行为,跟进去 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 // todo 可以接受客户端的数据了 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); // todo 由于端口的绑定未完成,所以 wasActive是 false try { // todo 绑定端口, 进去就是NIO原生JDK绑定端口的代码 doBind(localAddress); // todo 端口绑定完成 isActive()是true } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } // todo 根据上面的逻辑判断, 结果为 true if (!wasActive && isActive()) { invokeLater(new Runnable() { // todo 来到这里很重要, 向下传递事件行为, 传播行为的时候, 从管道的第一个节点开始传播, 第一个节点被封装成 HeadContext的对象 // todo 进入方法, 去 HeadContext里面查看做了哪些事情 // todo 她会触发channel的read, 最终重新为 已经注册进selector 的 chanel, 二次注册添加上感性趣的accept事件 @Override public void run() { pipeline.fireChannelActive(); } }); } // todo 观察者模式, 设置改变状态, 通知观察者的方法回调 safeSetSuccess(promise); } AbstractChannel抽象内部类的register(EventLoop,channelPromise)方法

这个方法,是将channel注册进EventLoop的Selector, 它的调用顺序如下:

本类方法 regist()--> 本类方法 register0() --> 本类抽象方法doRegister()

doRegister() 在这里设计成抽象方法,等着子类去具体的实现, 为啥这样做呢?

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpygjx.html