为了避免干扰因素,这里的代码去除了一些,辅助行的字段和方法,AmpMessage其实是主要用于描述头信息的,并且包含body的buffer数据 Data字段,并实现获取消息体Length的方法(用于发送消息时,计算缓冲区)
public class AmpMessage : Peach.Messaging.IMessage { /// <summary> /// 第一个版本为18个字节头固定长度 /// </summary> public const int VERSION_0_HEAD_LENGTH = 18; /// <summary> /// 现有版本21个字节头固定长度 /// </summary> public const int VERSION_1_HEAD_LENGTH = 21; /// <summary> /// 状态码 /// </summary> public int Code { get; set; } //0 默认为Protobuf 1 MessagePack 2 = JSON public CodecType CodecType { get; set; } /// <summary> /// 实际的请求数据 /// </summary> public byte[] Data { get; set; } public int Length { get { var hl = Version == 0 ? VERSION_0_HEAD_LENGTH : VERSION_1_HEAD_LENGTH; if(Data == null) { return hl; } return hl + this.Data.Length; } } /// <summary> /// 消息标识 /// </summary> public string Id => $"{ServiceId}|{MessageId}|{Sequence}"; /// <summary> /// 调用服务的唯一消息号 确定哪个方法 /// </summary> public ushort MessageId { get; set; } /// <summary> /// 请求的序列号 /// </summary> public int Sequence { get; set; } /// <summary> /// 调用服务的唯一服务号 确定哪个服务 /// </summary> public int ServiceId { get; set; } /// <summary> /// 协议版本0/1 /// </summary> public byte Version { get; set; } public InvokeMessageType InvokeMessageType { get; set; } } public enum InvokeMessageType : byte { Request = 1, Response = 2, Notify = 3, OnewayRequest=4 //请求且不等待回复 } 3.3.2 AmpProtocol的实现AmpProtocol中的实现主要是对ProtocolMeta描述,代码中已有详细注释,至于打包和解包,就是根据协议Write或者Read对应的数据类型即可
/// <summary> /// Amp Protocol /// </summary> public class AmpProtocol : IProtocol<AmpMessage> { private readonly ISerializer _serializer; public AmpProtocol(ISerializer serializer) { this._serializer = serializer; } static readonly ProtocolMeta AMP_PROTOCOL_META = new ProtocolMeta { InitialBytesToStrip = 0, //读取时需要跳过的字节数 LengthAdjustment = -5, //包实际长度的纠正,如果包长包括包头和包体,则要减去Length之前的部分 LengthFieldLength = 4, //长度字段的字节数 整型为4个字节 LengthFieldOffset = 1, //长度属性的起始(偏移)位 MaxFrameLength = int.MaxValue, //最大的数据包字节数 HeartbeatInterval = 30 * 1000 // 30秒没消息发一个心跳包 }; public ProtocolMeta GetProtocolMeta() { return AMP_PROTOCOL_META; } public void Pack(IBufferWriter writer, AmpMessage message) { writer.WriteByte(message.Version); writer.WriteInt(message.Length); writer.WriteInt(message.Sequence); writer.WriteByte((byte)message.InvokeMessageType); if (message.Version == 0) { writer.WriteUShort((ushort)message.ServiceId); } else { writer.WriteInt(message.ServiceId); } writer.WriteUShort(message.MessageId); writer.WriteInt(message.Code); if(message.Version == 1) { writer.WriteByte(_serializer.CodecType); } if (message.Data != null) { writer.WriteBytes(message.Data); } } public AmpMessage Parse(IBufferReader reader) { if (reader.ReadableBytes == 0) { return null; } var msg = new AmpMessage {Version = reader.ReadByte()}; int headLength; if (msg.Version == 0 ) { headLength = AmpMessage.VERSION_0_HEAD_LENGTH; if (reader.ReadableBytes < AmpMessage.VERSION_0_HEAD_LENGTH - 1) { throw new RpcCodecException($"decode error ,ReadableBytes={reader.ReadableBytes+1},HEAD_LENGTH={AmpMessage.VERSION_0_HEAD_LENGTH}"); } } else if (msg.Version == 1 ) { headLength = AmpMessage.VERSION_1_HEAD_LENGTH; if (reader.ReadableBytes < AmpMessage.VERSION_1_HEAD_LENGTH - 1) { throw new RpcCodecException($"decode error ,ReadableBytes={reader.ReadableBytes+1},HEAD_LENGTH={AmpMessage.VERSION_1_HEAD_LENGTH}"); } } else { throw new RpcCodecException($"decode error ,{msg.Version} is not support"); } var length = reader.ReadInt(); msg.Sequence = reader.ReadInt(); var type = reader.ReadByte(); msg.InvokeMessageType = (InvokeMessageType)Enum.ToObject(typeof(InvokeMessageType), type); msg.ServiceId = msg.Version == 0 ? reader.ReadUShort() : reader.ReadInt(); msg.MessageId = reader.ReadUShort(); msg.Code = reader.ReadInt(); if (msg.Version == 1) { byte codeType = reader.ReadByte(); if (codeType != this._serializer.CodecType) { throw new RpcCodecException($"CodecType:{codeType} is not Match {this._serializer.CodecType}"); } msg.CodecType = (CodecType)Enum.ToObject(typeof(CodecType), codeType); } else { msg.CodecType = CodecType.Protobuf; } int left = length - headLength; if (left > 0) { if (left > reader.ReadableBytes) { throw new RpcCodecException("message not long enough!"); } msg.Data = new byte[left]; reader.ReadBytes(msg.Data); } return msg; } } } 4. 后记