Netty 中的消息解析和编解码器 (3)

SIZE_TABLE 里面的值是干啥用的呢,刚才提到会将 byte 数据先预读到缓冲区,初始默认大小为1024,当目前没有这么多字节需要读的时候,会动态缩小缓冲区,而预判待读取的字节有很多的时候会扩大缓冲区。

动态预估下一次可能会有多少数据待读取的操作在哪里呢?还是回到 read()方法,while 循环完一轮之后,会执行一句:

allocHandle.readComplete();

对应到 AdaptiveRecvByteBufAllocator 中:

@Override public void readComplete() { record(totalBytesRead()); } //根据当前的actualReadBytes大小,对nextReceiveBufferSize进行更新 private void record(int actualReadBytes) { //如果actualReadBytes 小于 当前索引-INDEX_DECREMENT-1 的值,说明容量需要缩减 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { if (decreaseNow) { //则取 当前索引-INDEX_DECREMENT 与 minIndex的最大值 index = Math.max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } //读到的值大于缓冲大小 } else if (actualReadBytes >= nextReceiveBufferSize) { // INDEX_INCREMENT=4 index前进4 index = Math.min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } }

通过上一次的流大小来预测下一次的流大小,可针对不同的应用场景来进行缓冲区的分配。像IM消息可能是几K ,文件传输可能是几百M,不同的场景用到的内存缓冲大小不一样对性能的影响也不同。如果所有的场景都是同一种内存空间分配,客户端连接多的情况下,线程数过多可能导致内存溢出。

3. Netty 中的编解码器

上面两小节聊到消息从哪里来,默认消息格式为 ByteBuf,缓冲区大小默认为1024,会动态预估下次缓冲区大小。下面我们就正式来说一下编解码相关的内容,编解码相关的源码都在 codec 包中:

Netty 中的消息解析和编解码器

因为编码器要实现的是对输出的内容编码,都是实现 ChannelOutboundHandler 接口,解码器对接收的内容解码,都是实现 ChannelInboundHandler 接口,所以可以完全适配 ChannelPipeline 将编解码器作为一种插件的形式做一些灵活的搭配。

3.1 decoder

解码器负责将输入的消息解析为指定的格式。消息输入都来自inbound,即继承 ChannelInboundHandler 接口,顶级的解码器有两种类型:

将字节解码为消息:ByteToMessageDecoder

将一种消息类型解码为另一种 类型:MessageToMessageDecoder。

Netty 中的消息解析和编解码器

字节码解析为消息这应该是最普通,最基本的使用方式,这里所谓的字节码就是上面我们讲到的 ByteBuf 序列,默认包含1024字节的字节数组。关于 ByteToMessageDecoder 的分析上一节在讲粘包的时候顺带提及,大家有兴趣可以回去看看:ByteToMessageDecoder 分析

MessageToMessageDecoder 更好理解,比如消息的类型为Integer,需要将 Integer 转为 String。那么就可以继承 MessageToMessageDecoder 实现自己的转换方法。我们先简单看一下它的实现:

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CodecOutputList out = CodecOutputList.newInstance(); try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; try { decode(ctx, cast, out); } finally { ReferenceCountUtil.release(cast); } } else { out.add(msg); } } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.getUnsafe(i)); } out.recycle(); } } protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

上面的 channelRead()方法中将 msg 转为消息原本的类型,然后进入 decode()方法。 decode() 是一个抽象方法,言意之下你想转为啥类型,你就实现该方法去转便是。

3.2 encoder

编码器主要的作用是将出站事件的消息按照指定格式编码输出。那么编码器应该是继承 outBound 事件,看一下主要的类图:

Netty 中的消息解析和编解码器

编码器的基本类型与解码器相反:将对象拆解为字节,将对象编码为另一种对象。

关于基本编解码器的使用和自定义编解码器上一节我们已经讲过,这里就不再复述。下一篇单独看看在 Netty 中使用protobuf编码格式进行数据传输。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzzjyz.html