下面代码主要是实现对应消息按照 RESP 协议 进行序列化操作,具体就是上面楼主说的,分配一个ByteBuf,然后将将消息输出对象序列化的字节数组塞到ByteBuf中输出即可。
private static void writeInlineCommandMessage(ByteBufAllocator allocator, InlineCommandRedisMessage msg, List<Object> out) { writeString(allocator, RedisMessageType.INLINE_COMMAND, msg.content(), out); } private static void writeSimpleStringMessage(ByteBufAllocator allocator, SimpleStringRedisMessage msg, List<Object> out) { writeString(allocator, RedisMessageType.SIMPLE_STRING, msg.content(), out); } private static void writeErrorMessage(ByteBufAllocator allocator, ErrorRedisMessage msg, List<Object> out) { writeString(allocator, RedisMessageType.ERROR, msg.content(), out); } private static void writeString(ByteBufAllocator allocator, RedisMessageType type, String content, List<Object> out) { ByteBuf buf = allocator.ioBuffer(type.length() + ByteBufUtil.utf8MaxBytes(content) + RedisConstants.EOL_LENGTH); type.writeTo(buf); ByteBufUtil.writeUtf8(buf, content); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } private void writeIntegerMessage(ByteBufAllocator allocator, IntegerRedisMessage msg, List<Object> out) { ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH); RedisMessageType.INTEGER.writeTo(buf); buf.writeBytes(numberToBytes(msg.value())); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } private void writeBulkStringHeader(ByteBufAllocator allocator, BulkStringHeaderRedisMessage msg, List<Object> out) { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + (msg.isNull() ? RedisConstants.NULL_LENGTH : RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH)); RedisMessageType.BULK_STRING.writeTo(buf); if (msg.isNull()) { buf.writeShort(RedisConstants.NULL_SHORT); } else { buf.writeBytes(numberToBytes(msg.bulkStringLength())); buf.writeShort(RedisConstants.EOL_SHORT); } out.add(buf); } private static void writeBulkStringContent(ByteBufAllocator allocator, BulkStringRedisContent msg, List<Object> out) { out.add(msg.content().retain()); if (msg instanceof LastBulkStringRedisContent) { out.add(allocator.ioBuffer(RedisConstants.EOL_LENGTH).writeShort(RedisConstants.EOL_SHORT)); } } private void writeFullBulkStringMessage(ByteBufAllocator allocator, FullBulkStringRedisMessage msg, List<Object> out) { if (msg.isNull()) { ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + RedisConstants.EOL_LENGTH); RedisMessageType.BULK_STRING.writeTo(buf); buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } else { ByteBuf headerBuf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH); RedisMessageType.BULK_STRING.writeTo(headerBuf); headerBuf.writeBytes(numberToBytes(msg.content().readableBytes())); headerBuf.writeShort(RedisConstants.EOL_SHORT); out.add(headerBuf); out.add(msg.content().retain()); out.add(allocator.ioBuffer(RedisConstants.EOL_LENGTH).writeShort(RedisConstants.EOL_SHORT)); } } /** * Write array header only without body. Use this if you want to write arrays as streaming. */ private void writeArrayHeader(ByteBufAllocator allocator, ArrayHeaderRedisMessage msg, List<Object> out) { writeArrayHeader(allocator, msg.isNull(), msg.length(), out); } /** * Write full constructed array message. */ private void writeArrayMessage(ByteBufAllocator allocator, ArrayRedisMessage msg, List<Object> out) { if (msg.isNull()) { writeArrayHeader(allocator, msg.isNull(), RedisConstants.NULL_VALUE, out); } else { writeArrayHeader(allocator, msg.isNull(), msg.children().size(), out); for (RedisMessage child : msg.children()) { writeRedisMessage(allocator, child, out); } } } private void writeArrayHeader(ByteBufAllocator allocator, boolean isNull, long length, List<Object> out) { if (isNull) { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.NULL_LENGTH + RedisConstants.EOL_LENGTH); RedisMessageType.ARRAY_HEADER.writeTo(buf); buf.writeShort(RedisConstants.NULL_SHORT); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } else { final ByteBuf buf = allocator.ioBuffer(RedisConstants.TYPE_LENGTH + RedisConstants.LONG_MAX_LENGTH + RedisConstants.EOL_LENGTH); RedisMessageType.ARRAY_HEADER.writeTo(buf); buf.writeBytes(numberToBytes(length)); buf.writeShort(RedisConstants.EOL_SHORT); out.add(buf); } } 小结对于 Netty 源码,楼主一直是一种敬畏的态度,没想到今天竟然从另一个方面对 Netty 的冰山一角展开解读,毕竟万事开头难,有了这一次希望之后可以更顺利,在技术成长的道路上一起加油。
参考链接Redis协议规范(译文)
TCP 粘包问题浅析及其解决方案
基于Netty实现Redis协议的编码解码器