Netty学习笔记(番外篇) - ChannelHandler、ChannelPipeline和ChannelHandlerContext的联系 (4)

在这里通过 invokeHandler 方法对当前 ChannelHandler 进行状态检查,通过了就将调用当前 ChannelHandler 的 channelRead 方法,没有通过将调用 fireChannelRead 方法将事件传递到下一个 ChannelHandler 上。而 head 的类型是 HeadContext,本身也实现了 ChannelInBoundHandler 接口,所以这里调用的是 HeadContext 的 channelRead 方法。

// DefaultChannelPipeline.java final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); } }

这里对消息没有做任何处理,直接将读取消息传递下去。接下来看看 ChannelHandlerContext 的 fireChannelRead 做了什么

// AbstractChannelHandlerContext.java @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this; } private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; EventExecutor currentExecutor = executor(); do { ctx = ctx.next; } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND)); return ctx; } private static boolean skipContext( AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) { // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT return (ctx.executionMask & (onlyMask | mask)) == 0 || // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload // everything to preserve ordering. // // See https://github.com/netty/netty/issues/10067 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0); }

这里实现的逻辑是这样的:在双向链表中,从当前 ChannelHandlerContext 节点向后寻找,直到找到匹配 MASK_CHANNEL_READ 这个掩码的 ChannelHandlerContext。从上面的章节里可以直到 ChannelHandlerContext 的属性里保存了当前 ChannelHandler 支持(重写)的所有方法掩码的位运算值,通过位运算的结果来找到实现了对应方法的最近的 ChannelHandlerContext。
链表最后一个节点是 TailContext

// DefaultChannelPipeline.java final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { onUnhandledInboundMessage(ctx, msg); } /** * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point. */ protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) { onUnhandledInboundMessage(msg); if (logger.isDebugEnabled()) { logger.debug("Discarded message pipeline : {}. Channel : {}.", ctx.pipeline().names(), ctx.channel()); } } /** * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point. */ protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } } }

可以看到,tail 节点的 channelRead 方法没有将事件继续传递下去,只是释放了 msg。

写入消息

我们通过 OneChannelInBoundHandler 的 channelReadComplete 方法里的 ctx.write 方法来看

// AbstractChannelHandlerContext.java // 省略代码 @Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { write(msg, false, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { ObjectUtil.checkNotNull(msg, "msg"); try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { final WriteTask task = WriteTask.newInstance(next, m, promise, flush); if (!safeExecute(executor, task, promise, m, !flush)) { // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } } private AbstractChannelHandlerContext findContextOutbound(int mask) { AbstractChannelHandlerContext ctx = this; EventExecutor currentExecutor = executor(); do { ctx = ctx.prev; } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND)); return ctx; } // 省略代码

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxwyf.html