运用.net core中实例讲解RabbitMQ(5)

public static void SendMessageTopic() { //创建连接 using (var connection = RabbitMQHelper.GetConnection()) { //创建信道 using (var channel = connection.CreateModel()) { //声明交换机对象,fanout类型 string exchangeName = "topic_exchange"; channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); //队列名 string queueName1 = "topic_queue1"; string queueName2 = "topic_queue2"; //路由名 string routingKey1 = "*.orange.*"; string routingKey2 = "*.*.rabbit"; string routingKey3 = "lazy.#"; channel.QueueDeclare(queueName1, true, false, false); channel.QueueDeclare(queueName2, true, false, false); //把创建的队列绑定交换机,routingKey指定routingKey channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: routingKey1); channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey2); channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey3); //向交换机写10条消息 for (int i = 0; i < 10; i++) { string message = $"RabbitMQ Direct {i + 1} Message"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey: "aaa.orange.rabbit", null, body); channel.BasicPublish(exchangeName, routingKey: "lazy.aa.rabbit", null, body); Console.WriteLine($"发送Topic消息:{message}"); } } } }

这里演示了 routingKey为aaa.orange.rabbit,和lazy.aa.rabbit的情况,第一个匹配到Q1和Q2,第二个匹配到Q2,所以应该Q1是10条,Q2有20条,

执行后看rabbitMQ界面:

运用.net core中实例讲解RabbitMQ

(6)RPC模式

与上面其他5种所不同之处,该模式是拥有请求/回复的。也就是有响应的,上面5种都没有。

RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的处理业务,处理完后然后在A服务器继续执行下去,把异步的消息以同步的方式执行。

运用.net core中实例讲解RabbitMQ

客户端(C)声明一个排他队列自己订阅,然后发送消息到RPC队列同时也把这个排他队列名也在消息里传进去,服务端监听RPC队列,处理完业务后把处理结果发送到这个排他队列,然后客户端收到结果,继续处理自己的逻辑。

RPC的处理流程:

当客户端启动时,创建一个匿名的回调队列。

客户端为RPC请求设置2个属性:replyTo:设置回调队列名字;correlationId:标记request。

请求被发送到rpc_queue队列中。

RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。

客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了。

服务端代码

public class RPCServer { public static void RpcHandle() { var connection = RabbitMQHelper.GetConnection(); { var channel = connection.CreateModel(); { string queueName = "rpc_queue"; channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine("【服务端】等待RPC请求..."); consumer.Received += (model, ea) => { string response = null; var body = ea.Body.ToArray(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); Console.WriteLine($"【服务端】接收到数据:{ message},开始处理"); response = $"消息:{message},处理完成"; } catch (Exception e) { Console.WriteLine("错误:" + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; } } } }

客户端

public class RPCClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RPCClient() { connection = RabbitMQHelper.GetConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; //给消息id props.ReplyTo = replyQueueName;//回调的队列名,Client关闭后会自动删除 consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var response = Encoding.UTF8.GetString(body); //监听的消息Id和定义的消息Id相同代表这条消息服务端处理完成 if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); //发送消息 channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); //等待回复 return respQueue.Take(); } public void Close() { connection.Close(); } }

执行代码

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zysfdx.html