任何数据类型想在网络中进行传输,都得经过编解码转换成字节流
在netty中,服务端和客户端进行通信的其实是下面这样的
程序 ---编码--> 网络
网路 ---解码--> 程序
对应服务端:
入站数据, 经过解码器解码后给后续的handler使用
出站数据, 结果编码器编码成字节流给在网络上传播
在netty中的编码器其实就是一个handler,回想一下,无论是编写服务端的代码,还是客户端的代码,总会通过一个channelIniteializer往pipeline中动态的添加多个处理器,在添加我们自定义的处理器之前,往往会添加编解码器,其实说白了,编解码器其实就是特定功能的handler
我们这样做是有目的的,因为第一步就得需要把字节流转换成我们后续的handler中能处理的常见的数据类型
Netty中的编解码器太多了,下面就用常用的ByteToMessageDecoder介绍他的体系
编码器的模板基类ByteToMessageDecoderByteToMessageDecoder继承了ChannelInboundHandlerAdapter 说明它是处理出站方向数据的编码器,而且它也因此是一个不折不扣的Handler,在回想,其实In开头的handler都是基于事件驱动的,被动的处理器,当客户端发生某种事件时,它对应有不同的动作回调,而且它的特色就是 fireXXX往下传递事件, 带回我们就能看到,netty用它把处理好的数据往下传递
架构概述ByteToMessageDecoder本身是一个抽象类,但是它只有一个抽象方法decode()
netty中的解码器的工作流程如下:
累加字节流
调用子类的decode()方法进行解码
将解析完成的ByteBuf往后传递
既然是入栈处理器,有了新的数据,channelRead()就会被回调,我们去看一下它的channelRead()
下面是它的源码,
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { // todo 在这里判断, 是否是 ByteBuf类型的,如果是,进行解码,不是的话,简单的往下传播下去 CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; // todo 进入查看 cumulation是类型 累加器,其实就是往 ByteBuf中 write数据,并且,当ByteBuf 内存不够时进行扩容 first = cumulation == null; // todo 如果为空, 则说明这是第一次进来的数据, 从没累加过 if (first) { cumulation = data; // todo 如果是第一次进来,直接用打他将累加器初始化 } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); // todo 非第一次进来,就进行累加 } // todo , 这是第二部, 调用子类的decode()进行解析 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); // todo 调用 fireChannelRead,向后船舶channelRead事件, 前面的学习也知道, 她会从当前节点,挨个回调pipeline中处理器的CHannelRead方法 fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); }其实三步工作流程就在上面的代码中
累加字节流 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
调用子类的decode()进行解析 callDecode(ctx, cumulation, out);
将解析完成的ByteBuf往后传递fireChannelRead(ctx, out, size);
它的设计很清晰, 由ByteToMessageDecoder完成整个编码器的模板,规定好具体的处理流程,首先它负责字节流的累加工作,但是具体如何进行解码,由不同的子类去实现,因此它设及成了唯一的抽象方法,在他的模板中,子类将数据解码完成后,它再将数据传播下去
什么是累加器cumulation?源码如下:我们可以看到,其实他就是一个辅助对象, 里面维护了一个 ByteBuf的引用
所谓累加,就是往ByteBuf中write数据
所谓维护,就是 动态判断ByteBuf中可写入的区域大小和将写入的字节的关系
最后,为了防止内存泄露,将收到的ByteBuf 释放
// todo 创建一个累加器 public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { final ByteBuf buffer; // todo 如果 writerIndex + readableBytes > cumulation.maxCapacity 说明已经无法继续累加了 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { // todo 扩容 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } // todo 往 ByteBuf中写入数据 完成累加 buffer.writeBytes(in); // todo 累加完成之后,原数据 释放掉 in.release(); return buffer; } }; 第二步,callDecode(ctx, cumulation, out)