RabbitMQ学习总结 (4)

Rpc是什么不用多说了,反正我也就知道他是远程过程调用嘛。用RabbitMQ来实现Rpc,官网有一篇简单的示例,但个人感觉RabbitMQ并不太适合做Rpc。不过用这个示例作为对RabbitMQ的一个学习成果实践还是蛮不错的,下面请看代码:

public class RpcPub { public async Task<string> SendMsg(string message) { ConnectionFactory factory = RabbitMQHelper.ConFactory; //创建一个连接,连接到服务器: using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //定义一个临时的队列,用来接收返回的消息 string replyQueueName = channel.QueueDeclare().QueueName; var consumer = new QueueingConsumer(channel); //监听该临时队列,自动act消息 channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer); string corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); //定义ReplyTo让服务端知道返回消息给哪个路由 props.ReplyTo = replyQueueName; //定义CorrelationId作为消息的唯一关联ID props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); Task<string> result = new Task<string>(() => { while (true) { string replystr = string.Empty; consumer.GetResult((args) => { if (args.BasicProperties.CorrelationId == corrId) { replystr = Encoding.UTF8.GetString(args.Body); } }); if (replystr != string.Empty) return replystr; } }); result.Start(); return await result; } } } } public class RpcConsumer : IDisposable { private ConnectionFactory factory = RabbitMQHelper.ConFactory; private IConnection connection; public void ReceiveMsg(Action<string> callback) { if (connection == null || !connection.IsOpen) connection = factory.CreateConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); //channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, arg) => { var props = arg.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; callback($"接收到消息:{Encoding.UTF8.GetString(arg.Body)}"); var responseBytes = Encoding.UTF8.GetBytes($"成功接收你的消息:{ Encoding.UTF8.GetString(arg.Body)}"); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: arg.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer); } public void Dispose() { if (connection != null && connection.IsOpen) connection.Dispose(); } } 基本流程:

当客户端发送消息之前,创建一个匿名的回调队列channel.QueueDeclare(),并监听该队列。

客户端获取匿名队列的名称,在请求中设置2个属性:replyTo=回调队列名称;CorrelationId=请求关联的唯一id

客户端发送请求到rpc_queue队列中。

RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理消息,返回结果发送到replyTo指定的队列,在请求中设置1个属性:CorrelationId=请求过来的CorrelationId

客户端监听的队列收到消息,检查correlationId是否与之前生成的匹配,匹配成功返回结果。

对于为什么要验证correlationId这一项,有两个原因,1.消息可能并不是rpc服务器发送的 2.rpc服务如果在某个阶段突然挂掉,可能会发送一个不包含correlationId的消息

RabbitMQ学习总结

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyssgz.html