Netty 源码中对 Redis 协议的实现 (2)

下面代码中,是针对每种数据类型进行反序列化的具体业务逻辑。有小伙伴可能会想,没有看到解码胡数组类型的逻辑呢?实际上在 RESP 协议中数组就是其他类型的组合,所以完全可以循环读取,按照单个元素解码。

// 解码消息类型 private boolean decodeType(ByteBuf in) throws Exception { if (!in.isReadable()) { return false; } else { this.type = RedisMessageType.valueOf(in.readByte()); this.state = this.type.isInline() ? RedisDecoder.State.DECODE_INLINE : RedisDecoder.State.DECODE_LENGTH; return true; } } // 解码单行字符串,错误信息,或者整型数据类型 private boolean decodeInline(ByteBuf in, List<Object> out) throws Exception { ByteBuf lineBytes = readLine(in); if (lineBytes == null) { if (in.readableBytes() > this.maxInlineMessageLength) { throw new RedisCodecException("length: " + in.readableBytes() + " (expected: <= " + this.maxInlineMessageLength + ")"); } else { return false; } } else { out.add(this.newInlineRedisMessage(this.type, lineBytes)); this.resetDecoder(); return true; } } // 解码消息长度 private boolean decodeLength(ByteBuf in, List<Object> out) throws Exception { ByteBuf lineByteBuf = readLine(in); if (lineByteBuf == null) { return false; } else { long length = this.parseRedisNumber(lineByteBuf); if (length < -1L) { throw new RedisCodecException("length: " + length + " (expected: >= " + -1 + ")"); } else { switch(this.type) { case ARRAY_HEADER: out.add(new ArrayHeaderRedisMessage(length)); this.resetDecoder(); return true; case BULK_STRING: if (length > 536870912L) { throw new RedisCodecException("length: " + length + " (expected: <= " + 536870912 + ")"); } this.remainingBulkLength = (int)length; return this.decodeBulkString(in, out); default: throw new RedisCodecException("bad type: " + this.type); } } } } // 解码多行字符串 private boolean decodeBulkString(ByteBuf in, List<Object> out) throws Exception { switch(this.remainingBulkLength) { case -1: out.add(FullBulkStringRedisMessage.NULL_INSTANCE); this.resetDecoder(); return true; case 0: this.state = RedisDecoder.State.DECODE_BULK_STRING_EOL; return this.decodeBulkStringEndOfLine(in, out); default: out.add(new BulkStringHeaderRedisMessage(this.remainingBulkLength)); this.state = RedisDecoder.State.DECODE_BULK_STRING_CONTENT; return this.decodeBulkStringContent(in, out); } } Netty 中 RESP 编码器实现

编码器,顾名思义,就是将对象根据 RESP 协议序列化成字节流发送到服务端。编码器的实现非常简单,不用考虑拆包等问题,就是分配一个ByteBuf,然后将将消息输出对象序列化的字节数组塞到ByteBuf中输出就可以了。

下面代码中就是 encode 方法直接调用 writeRedisMessage 方法,根据消息类型进行写buffer操作。

@Override protected void encode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out) throws Exception { try { writeRedisMessage(ctx.alloc(), msg, out); } catch (CodecException e) { throw e; } catch (Exception e) { throw new CodecException(e); } } private void writeRedisMessage(ByteBufAllocator allocator, RedisMessage msg, List<Object> out) { // 判断消息类型,然后调用写相应消息的方法。 if (msg instanceof InlineCommandRedisMessage) { writeInlineCommandMessage(allocator, (InlineCommandRedisMessage) msg, out); } else if (msg instanceof SimpleStringRedisMessage) { writeSimpleStringMessage(allocator, (SimpleStringRedisMessage) msg, out); } else if (msg instanceof ErrorRedisMessage) { writeErrorMessage(allocator, (ErrorRedisMessage) msg, out); } else if (msg instanceof IntegerRedisMessage) { writeIntegerMessage(allocator, (IntegerRedisMessage) msg, out); } else if (msg instanceof FullBulkStringRedisMessage) { writeFullBulkStringMessage(allocator, (FullBulkStringRedisMessage) msg, out); } else if (msg instanceof BulkStringRedisContent) { writeBulkStringContent(allocator, (BulkStringRedisContent) msg, out); } else if (msg instanceof BulkStringHeaderRedisMessage) { writeBulkStringHeader(allocator, (BulkStringHeaderRedisMessage) msg, out); } else if (msg instanceof ArrayHeaderRedisMessage) { writeArrayHeader(allocator, (ArrayHeaderRedisMessage) msg, out); } else if (msg instanceof ArrayRedisMessage) { writeArrayMessage(allocator, (ArrayRedisMessage) msg, out); } else { throw new CodecException("unknown message type: " + msg); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsffdd.html