近期一直在做网络协议相关的工作,所以博客也就与之相关的比较多,今天楼主结合 Redis的协议 RESP 看看在 Netty 源码中是如何实现的。
RESP 是 Redis 序列化协议的简写。它是一种直观的文本协议,优势在于实现非常简单,解析性能极好。
Redis 协议将传输的结构数据分为 5 种最小单元类型,单元结束时统一加上回车换行符号\r\n,来表示该单元的结束。
单行字符串 以 + 符号开头。
多行字符串 以 $ 符号开头,后跟字符串长度。
整数值 以 : 符号开头,后跟整数的字符串形式。
错误消息 以 - 符号开头。
数组 以 * 号开头,后跟数组的长度。
关于 RESP 协议的具体介绍感兴趣的小伙伴请移步楼主的另一篇文章Redis协议规范(译文)
Netty 中 RESP 协议的定义如下面代码中所表示的,Netty中使用对应符号的ASCII码来表示,感兴趣的小伙伴可以查一下ASCII码表来验证一下。
public enum RedisMessageType { // 以 + 开头的单行字符串 SIMPLE_STRING((byte)43, true), // 以 - 开头的错误信息 ERROR((byte)45, true), // 以 : 开头的整型数据 INTEGER((byte)58, true), // 以 $ 开头的多行字符串 BULK_STRING((byte)36, false), // 以 * 开头的数组 ARRAY_HEADER((byte)42, false), ARRAY((byte)42, false); private final byte value; private final boolean inline; private RedisMessageType(byte value, boolean inline) { this.value = value; this.inline = inline; } public byte value() { return this.value; } public boolean isInline() { return this.inline; } public static RedisMessageType valueOf(byte value) { switch(value) { case 36: return BULK_STRING; case 42: return ARRAY_HEADER; case 43: return SIMPLE_STRING; case 45: return ERROR; case 58: return INTEGER; default: throw new RedisCodecException("Unknown RedisMessageType: " + value); } } } Netty 中 RESP 解码器实现解码器,顾名思义,就是将服务器返回的数据根据协议反序列化成易于阅读的信息。RedisDecoder 就是根据 RESP 将服务端返回的信息反序列化出来。下面是指令的编码格式
SET key value => *3\r\n$5\r\nSET\r\n$1\r\nkey\r\n$1\r\nvalue\r\n指令是一个字符串数组,编码一个字符串数组,首先需要编码数组长度*3\r\n。然后依次编码各个字符串参数。编码字符串首先需要编码字符串的长度$5\r\n。然后再编码字符串的内容SET\r\n。Redis 消息以\r\n作为分隔符,这样设计其实挺浪费网络传输流量的,消息内容里面到处都是\r\n符号。但是这样的消息可读性会比较好,便于调试。RESP 协议是牺牲性能换取可读,易于实现的一个经典例子。
指令解码器的实现,网络字节流的读取存在拆包问题。所拆包问题是指一次Read调用从套件字读到的字节数组可能只是一个完整消息的一部分。而另外一部分则需要发起另外一次Read调用才可能读到,甚至要发起多个Read调用才可以读到完整的一条消息。对于拆包问题感兴趣的小伙伴可以查看楼主的另一篇文章TCP 粘包问题浅析及其解决方案
如果我们拿部分消息去反序列化成输入消息对象肯定是要失败的,或者说生成的消息对象是不完整填充的。这个时候我们需要等待下一次Read调用,然后将这两次Read调用的字节数组拼起来,尝试再一次反序列化。
问题来了,如果一个输入消息对象很大,就可能需要多个Read调用和多次反序列化操作才能完整的解包出一个输入对象。那这个反序列化的过程就会重复了多次。
针对这个问题,Netty 中很巧妙的解决了这个问题,如下所示,Netty 中通过 state 属性来保存当前序列化的状态,然后下次反序列化的时候就可以从上次记录的 state 直接继续反序列化。这样就避免了重复的问题。
// 保持当前序列化状态的字段 private RedisDecoder.State state; public RedisDecoder() { this(65536, FixedRedisMessagePool.INSTANCE); } public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) { this.toPositiveLongProcessor = new RedisDecoder.ToPositiveLongProcessor(); // 默认初始化状态为,反序列化指令类型 this.state = RedisDecoder.State.DECODE_TYPE; if (maxInlineMessageLength > 0 && maxInlineMessageLength <= 536870912) { this.maxInlineMessageLength = maxInlineMessageLength; this.messagePool = messagePool; } else { throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength + " (expected: <= " + 536870912 + ")"); } } // 解码器的主要业务逻辑 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try { // 循环读取信息,将信息完成的序列化 while(true) { switch(this.state) { case DECODE_TYPE: if (this.decodeType(in)) { break; } return; case DECODE_INLINE: if (this.decodeInline(in, out)) { break; } return; case DECODE_LENGTH: if (this.decodeLength(in, out)) { break; } return; case DECODE_BULK_STRING_EOL: if (this.decodeBulkStringEndOfLine(in, out)) { break; } return; case DECODE_BULK_STRING_CONTENT: if (this.decodeBulkStringContent(in, out)) { break; } return; default: throw new RedisCodecException("Unknown state: " + this.state); } } } catch (RedisCodecException var5) { this.resetDecoder(); throw var5; } catch (Exception var6) { this.resetDecoder(); throw new RedisCodecException(var6); } }