这一篇是 ChannelHandler 和 ChannelPipeline 的番外篇,主要从源码的角度来学习 ChannelHandler、ChannelHandler 和 ChannelPipeline 相互之间是如何建立联系和运行的。
一、添加 ChannelHandler从上一篇的 demo 中可以看到在初始化 Server 和 Client 的时候,都会通过 ChannelPipeline 的 addLast 方法将 ChannelHandler 添加进去
// Server.java // 部分代码片段 ServerBootstrap serverBootstrap = new ServerBootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); serverBootstrap.group(group) .channel(NioServerSocketChannel.class)Channel .localAddress(new InetSocketAddress("localhost", 9999)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加ChannelHandler socketChannel.pipeline().addLast(new OneChannelOutBoundHandler()); socketChannel.pipeline().addLast(new OneChannelInBoundHandler()); socketChannel.pipeline().addLast(new TwoChannelInBoundHandler()); } });在上面的代码片段中,socketChannel.pipeline()方法返回的是一个类型是 DefaultChannelPipeline 的实例,DefaultChannelPipeline 实现了 ChannelPipeline 接口
DefaultChannelPipeline 的 addLast 方法实现如下: // DefaultChannelPipeline.java @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { ObjectUtil.checkNotNull(handlers, "handlers"); for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; }
经过一系列重载方法调用,最终进入到下面的 addLast 方法
// DefaultChannelPipeline.java @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this; }在这个方法实现中,利用传进来的 ChannelHandler 在 newContext 创建了一个 AbstractChannelHandlerContext 对象。newContext 方法实现如下:
// DefaultChannelPipeline.java private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }这里创建并返回了一个类型为 DefaultChannelHandlerContext 的对象。从传入的参数可以看到,在这里将 ChannelHandlerContext、ChannelPipeline(this)和 ChannelHandler 三者建立了关系。
最后再看看 addLast0 方法实现:
这里出现了 AbstractChannelHandlerContext 的两个属性 prev 和 next,而 DefaultChannelPipeline 有一个属性 tail。从实现逻辑上看起来像是建立了一个双向链表的结构。下面的代码片段是关于 tail 和另一个相关属性 head:
// DefaultChannelPipeline.java public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; // ...... protected DefaultChannelPipeline(Channel channel) { // ...... tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } // ...... } // HeaderContext.java final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { // ...... @Override public ChannelHandler handler() { return this; } //...... } // TailContext.java final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { // ...... @Override public ChannelHandler handler() { return this; } // ...... }DefaultChannelPipeline 内部维护了两个 AbstractChannelHandlerContext 类型的属性 head、tail,而这两个属性又都实现了 ChannelHandler 的子接口。构造方法里将这两个属性维护成了一个双向链表。结合上面的 addLast0 方法实现,可以知道在添加 ChannelHandler 的时候,其实是在对 ChannelPipeline 内部维护的双向链表做插入操作。
下面是 ChannelHandlerContext 相关类的结构
所以,对 ChannelPipeline 做 add 操作添加 ChannelHandler 后,内部结构大体是这样的:
所有的 ChannelHandlerContext 组成了一个双向链表,头部是 HeadContext,尾部是 TailContext,因为它们都实现了 ChannelHandler 接口,所以它们内部的 Handler 也是自己。每次添加一个 ChannelHandler,将会新创建一个 DefaultChannelHandler 关联,并按照一定的顺序插入到链表中。
在 AbstractChannelHandlerContext 类里有一个属性 executionMask,在构造方法初始化时会对它进行赋值 // AbstractChannelHandlerContext.java // 省略部分代码 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.executionMask = mask(handlerClass); // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; } // 省略部分代码