ok,现在我们去直接去ServerBootStraptAcceptor中,他是ServerBootStrap的内部类,我们看它的channelRead()方法,源码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; // todo 给这个来连接的通道添加 childHandler,是我在Server中添加的childHandler, 实际上是那个MyChannelInitializer , 最终目的是添加handler child.pipeline().addLast(childHandler); // todo 给新来的Channel设置 options 选项 setChannelOptions(child, childOptions, logger); // todo 给新来的Channel设置 attr属性 for (Entry<AttributeKey<?>, Object> e : childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //todo 这里这!! 把新的channel注册进 childGroup childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception {我们可以看到,如下工作:
初始化属性
把客户端的channel的注册进childGroup中的EventLoop
在这里补一张channelGroup的继承图
我们看这个childGroup.regist()方法, 我们知道childGroup是workerGroup,在本类中,它的类型是EventLoopGroup,这是个接口类型的变量, 我们用点进去查看源码自然跳转进接口中,但是我们需要找他的实现类, 那么,是谁重写的它的方法呢?
大家去看看上面的图,它的直接实现类只有一个MultiThreadEventGroup, 其实大家想想看,现在的任务是将原生的客户端channel,注册进WorkerGroup中的EventLoop,那第一步是啥呢? 不得先从这个 事件循环组中拿出一个事件循环吗? 好,进去看MultiThreadEventGroup是如何实现的, 源码如下:
@Override public ChannelFuture register(Channel channel) { // todo next() -- 就在上面-> 根据轮询算法获取一个事件的执行器 EventExecutor // todo, 而每一个EventLoop对应一个EventExecutor 这里之所以是个组, 是因为, 我的机器内核决定我的 事件循环组有八个线程, // todo ?? ???? // todo 但是一会的责任并没有一直循环, 难道有效的bossGroup只有一条 // todo 再进去就是SingleThreadEventLoop对此方法的实现了 return next().register(channel); }是的,确实在获取事件循环,我们进行跟进next().register(channel), 现在是 eventloop.regist() ,当我们进入方法时,再次来到EventLoopGroup对这个方法的实现, ok,大家重新去看上面的图,一个eventloop.regist(),现在不再是 循环组.regist 而是 事件循环.regist, 而在图上,我们可以很轻松的看到 , 对EventLoopGroup接口的实现就是 SingleThreadEventLoop, 好,接着进去看它的实现, 源码如下:
// todo register来到这里 @Override public ChannelFuture register(Channel channel) { // todo ChannelPromise == channel+Executor 跟进去 // todo 再次调用 register, 就在下面 return register(new DefaultChannelPromise(channel, this)); }调用本类的register(new DefaultChannelPromise(channel, this) ,接着进去,源码如下: 同样解析写在源码下面
@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); // todo 重点来了 // todo channel() 获取通道对象 // todo unsafe() 获取仅供内部使用的unsafe对象 它定义在Channel接口中, 具体的对象是 Channel的子类, AbstractNioChannel // todo unsafe对象进行下一步注册 register * promise.channel().unsafe().register(this, promise); return promise; }promise.channel() 取出的是客户端的 NioSocketChanenl
promise.channel().unsafe() 是 AbstractUnsafe
来到regist()的实现类
方法调用链:
本类方法register
本类方法register0()
本类抽象方法doRegister()
pipeline.fireChannelRegistered();传播channel注册事件
pipeline.fireChannelActive(); 传播channel Active事件
二次注册事件
其中,上面的doRegister()是真正的将jdk原生的channel注册进原生的selector
pipeline.fireChannelRegistered();是在 header --> ServerBootStraptAccptor --> 用户自己添加的handler --> tail 中,挨个传递 ChannelRegistered, 就是从头开始调用它们的函数, 我们着重看下面的第三个
pipeline.fireChannelActive();其实是比较绕的,涉及到了pipeline中事件的传递,但是它的作用很大,通过传播channelActive挨个回调他们的状态,netty成功的给这条客户端的新连接注册上了netty能处理的感兴趣的事件
整体源码太长了我不一一贴出来了, 直接看关于pipeline.fireChannelActive();的源码,如下:
if (isActive()) { if (firstRegistration) { // todo 在pipeline中传播ChannelActive的行为,跟进去 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 // todo 可以接受客户端的数据了 beginRead(); }第一个判断, if (isActive())针对两个channel,存在两种情况
如果是服务端的channel, 只有在channel绑定完端口后,才会处于active的状态
如果是客户端的channel, 注册到selector+处于连接状态, 他就是active状态
满足条件,进入第一个分支判断,同样满足第一次注册的条件,开始传播事件