Netty编码流程及WriteAndFlush()的实现

编码器的执行时机

首先, 我们想通过服务端,往客户端发送数据, 通常我们会调用ctx.writeAndFlush(数据)的方式, 入参位置的数据可能是基本数据类型,也可能对象

其次,编码器同样属于handler,只不过他是特化的专门用于编码作用的handler, 在我们的消息真正写入jdk底层的ByteBuffer时前,数据需要经过编码处理, 不是说不进行编码就发送不出去,而是不经过编码,客户端可能接受到的是乱码

然后,我们知道,ctx.writeAndFlush(数据)它其实是出站处理器特有的行为,因此注定了它需要在pipeline中进行传递,从哪里进行传递呢? 从tail节点开始,一直传播到header之前的我们自己添加的自定义的解码器中

WriteAndFlush()的逻辑

我们跟进源码WriteAndFlush()相对于Write(),它的flush字段是true

private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { //todo 因为flush 为 true next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); }

于是就会这样

逐个调用handler的write()

逐个调用handler的flush()

知道这一点很重要,这意味这我们知道了,事件传播分成两波进行, 一波write,一波flush, 这两波事件传播的大体流程我写在这里, 在下面

write

将ByteBuf 转换成DirctBuffer

将消息(DirctBuffer)封装进entry 插入写队列

设置写状态

flush

刷新标志,设置写状态

变量buffer队列,过滤Buffer

调用jdk底层的api,把ByteBuf写入jdk原生的ByteBuffer

自定义一个简单的编码器 /** * @Author: Changwu * @Date: 2019/7/21 20:49 */ public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol> { // todo write动作会传播到 MyPersonEncoder的write方法, 但是我们没有重写, 于是就执行 父类 MessageToByteEncoder的write, 我们进去看 @Override protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception { System.out.println("MyPersonEncoder...."); // 消息头 长度 out.writeInt(msg.getLength()); // 消息体 out.writeBytes(msg.getContent()); } }

选择继承MessageToByteEncoder<T> 从消息到字节的编码器

继续跟进

ok,现在来到了我们自定义的 解码器MyPersonEncoder ,

但是,并没看到正在传播的writeAndFlush(),没关系, 我们自己的解码器继承了MessageToByteEncoder,这个父类中实现了writeAndFlush(),源码如下:解析写在源码后面

// todo 看他的write方法 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {// todo 1 判断当前是否可以处理这个对象 @SuppressWarnings("unchecked") I cast = (I) msg; // todo 2 内存分配 buf = allocateBuffer(ctx, cast, preferDirect); try { // todo 3 调用本类的encode(), 这个方法就是我们自己实现的方法 encode(ctx, cast, buf); } finally { // todo 4 释放 ReferenceCountUtil.release(cast); } if (buf.isReadable()) { // todo 5. 往前传递 ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { // todo 释放 buf.release(); } }

将我们发送的消息msg,封装进了 ByteBuf 中

编码: 执行encode()方法,这是个抽象方法,由我们自定义的编码器实现

我们的实现很简单,分别往Buf里面写入下面两次数据

int类型的消息的长度

消息体

将msg释放

继续向前传递 write()事件

最终,释放第一步创建的ByteBuf

小结

到这里为止,编码器的执行流程已经完成了,我们可以看到,和解码器的架构逻辑相似,类似于模板设计模式,对我们来说,只不过是做了个填空题

其实到上面的最后一步 释放第一步创建的ByteBuf之前 ,消息已经被写到jdk底层的 ByteBuffer 中了,怎么做的呢? 别忘了它的上一步, 继续向前传递write()事件,再往前其实就是HeaderContext了,和HeaderContext直接关联的就是unsafe类, 这并不奇怪,我们都知道,netty中无论是客户端还是服务端channel底层的数据读写,都依赖unsafe

下面开始分析,WriteAndFlush()底层的两波任务细节

第一波事件传递 write()

我们跟进HenderContext的write() ,而HenderContext的中依赖的是unsafe.wirte()所以直接去 AbstractChannel的Unsafe 源码如下:

@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // todo 缓存 写进来的 buffer ReferenceCountUtil.release(msg); return; } int size; try { // todo buffer Dirct化 , (我们查看 AbstractNioByteBuf的实现) msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } // todo 插入写队列 将 msg 插入到 outboundBuffer // todo outboundBuffer 这个对象是 ChannelOutBoundBuf类型的,它的作用就是起到一个容器的作用 // todo 下面看, 是如何将 msg 添加进 ChannelOutBoundBuf中的 outboundBuffer.addMessage(msg, size, promise); }

参数位置的msg,就是经过我们自定义解码器的父类进行包装了的ByteBuf类型消息

这个方法主要做了三件事

第一: filterOutboundMessage(msg); 将ByteBuf转换成DirctByteBuf

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwsfwz.html