本篇内容主要梳理一下 Netty 中编解码器的逻辑和编解码器在 Netty 整个链路中的位置。
前面我们在分析 ChannelPipeline 的时候说到入站和出站事件的处理都在 pipeline 中维护着,通过list的形式将处理事件的 handler 按照先后关系保存为一个列表,有对应的事件过来就按照列表顺序取出 handler 来处理事件。
如果是入站事件按照 list 自然顺序调用 handler 来处理,如果是出站事件则反序调用 handler 来处理。所有的入站事件处理器都继承自 ChannelInboundHandler,出站事件处理器都继承自 ChannelOutboundHandler。channelPipeline 上的注释有说明 inbound 事件的传播顺序是:
* 入栈事件传播方法 * <li>{@link ChannelHandlerContext#fireChannelRegistered()}</li> * <li>{@link ChannelHandlerContext#fireChannelActive()}</li> * <li>{@link ChannelHandlerContext#fireChannelRead(Object)}</li> * <li>{@link ChannelHandlerContext#fireChannelReadComplete()}</li> * <li>{@link ChannelHandlerContext#fireExceptionCaught(Throwable)}</li> * <li>{@link ChannelHandlerContext#fireUserEventTriggered(Object)}</li> * <li>{@link ChannelHandlerContext#fireChannelWritabilityChanged()}</li> * <li>{@link ChannelHandlerContext#fireChannelInactive()}</li> * <li>{@link ChannelHandlerContext#fireChannelUnregistered()}</li> * </ul> * </li>即 handler 中的方法调用顺序是如上所示,我们主要关注的点在 channelRead() 方法上。下面就由 channelRead() 出发,去看看编解码器的使用。
1. channelRead 解析inbound 事件的入口在 NioEventLoop #run() 方法#processSelectedKeys()#processSelectedKeysPlain()#processSelectedKey()#unsafe.read()。
这里的 UnSafe 是定义在 Channel 接口中的子接口,并不是 JDK 的 UnSafe 类。UnSafe作为 channel 的内部类承担着 channel 网络读写相关的功能,这里可以抽出一节讨论,不是本篇的重点。我们继续看 UnSafe 的子类 NioByteUnsafe 重写的 read() 方法:
@Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //allocator负责建立缓冲区 final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { //分配内存 byteBuf = allocHandle.allocate(allocator); //读取socketChannel数据到分配的byteBuf,对写入的大小进行一个累计叠加 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } allocHandle.incMessagesRead(1); readPending = false; //触发pipeline的ChannelRead事件来对byteBuf进行后续处理 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); // 记录总共读取的大小 allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }read()方法从内存读取数据给到 ByteBuf,上一节我们提到了ByteBuf,Netty 自己实现的 byte 字节累加器。下面有一个while循环,每次读取的 bytebuf 会给到 pipeline.fireChannelRead(byteBuf)方法去处理。继续看 ChannelPipeline 的默认实现类 DefaultChannelPipeline 中的实现:
@Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }调用了 AbstractChannelHandlerContext#invokeChannelRead()方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }重点就在 invokeChannelRead() 的这一句:
((ChannelInboundHandler) handler()).channelRead(this, msg);最终触发了 ChannelInboundHandler#channelRead(ChannelHandlerContext ctx, Object msg) 方法。