下述代码展示了这个逻辑
public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> { @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { while (msg.readableBytes() >= 4) { //从输入的 ByteBuf 中读取下一个整数,并且计算其绝对值 int value = Math.abs(msg.readInt()); //将该整数写入到编码消息的 List 中 out.add(value); } } }使用 EmbeddedChannel 来测试代码
public class AbsIntegerEncoderTest { @Test public void testEncoded() { ByteBuf buf = Unpooled.buffer(); for (int i = 1; i < 10; i++) { buf.writeInt(i * -1); } // 创建一个 EmbeddedChanel,并安装一个要测试的 AbsIntegerEncoder EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder()); // 写入 ByteBuf,调用 readOutbound() 方法将会产生数据 System.out.println(channel.writeOutbound(buf)); System.out.println(channel.finish()); channel.readOutbound(); for (int i = 1; i < 10; i++) { int temp = channel.readOutbound(); System.out.println(temp); } System.out.println(channel.readOutbound() == null); } }下面是代码中执行的步骤。
将 4 字节的负整数写到一个新的 ByteBuf 中
创建一个 EmbeddedChannel,并为它分配一个 AbsIntegerEncoder
调用 EmbeddedChannel 上的 writeOutbound()方法来写入该 ByteBuf
标记该 Channel 为已完成状态
从 EmbeddedChannel 的出站端读取所有的整数,并验证是否只产生了绝对值
测试异常处理
应用程序通常需要执行比转换数据更加复杂的任务。例如,你可能需要处理格式不正确的输 入或者过量的数据。在下一个示例中,如果所读取的字节数超出了某个特定的限制,我们将会抛出一个 TooLongFrameException,这是一种经常用来防范资源被耗尽的方法
实现的代码如下
// 扩展 ByteToMessageDecoder 以将入站字节码为消息 public class FrameChunkDecoder extends ByteToMessageDecoder { private final int maxFrameSize; public FrameChunkDecoder(int maxFrameSize) { this.maxFrameSize = maxFrameSize; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int readableBytes = in.readableBytes(); if (readableBytes > maxFrameSize) { // 如果该帧太大,则丢弃它并抛出一个 TooLongFrameException in.clear(); throw new TooLongFrameException(); } // 否则,从 ByteBuf 中读取一个新的帧 ByteBuf buf = in.readBytes(readableBytes); // 该帧添加到解码消息的List中 out.add(buf); } }再使用 EmbeddedChannel 来测试这段代码
public class FrameChunkDecoderTest { @Test public void testFramesDecoded() { ByteBuf buf = Unpooled.buffer(); for (int i = 0; i < 9; i++) { buf.writeByte(i); } ByteBuf input = buf.duplicate(); // 创建一个 EmbeddedChannel,并向其安装一个帧大小为 3 字节的 FixedLengthFrameDecoder EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3)); System.out.println(channel.writeInbound(input.readBytes(2))); try { // 写入一个 4 字节大小的帧,并捕获预期的异常 channel.writeInbound(input.readBytes(4)); } catch (TooLongFrameException e) { e.printStackTrace(); } // 写入剩余的 2 字节,会产生一个有效帧 System.out.println(channel.writeInbound(input.readBytes(3)));//true System.out.println(channel.finish()); // 读取产生的消息,并且验证值 ByteBuf read = channel.readInbound(); System.out.println(read.equals(buf.readSlice(2)));//true read.release(); read = channel.readInbound(); System.out.println(read.equals(buf.skipBytes(4).readSlice(3)));//true read.release(); buf.release(); } }