AY C# RabbitMQ 2019 微笔记 (9)

image.png

using System; using System.Linq; using RabbitMQ.Client; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var routingKey = "anonymous.info"; var message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); } Console.ReadKey(); } }

消费者

using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var queueName = channel.QueueDeclare().QueueName; var severity = "anonymous.*"; channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: severity); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }

image.png


跟上面区别是 RouteKey 带了 * 号或者 #号

*号 代表1个单词

#号代表 0个以上的单词


服务端 消息路由规则 anonymous.info

换成      var severity = "a.*";

肯定收不到消息

换成# 肯定可以

*.info也可以

a# 收不到消息的

*.* 可以收到,然后把生产换成anonymous 就收不到了,因为路由规则 1个单词.第二个单词

以上内容是 AY做过测试了。


========================================================================


讲一下RPC,稍微有点绕,你理解一个既是消费端也是生产者,双方都是的,也可以配合 http请求响应稍微配合理解。

但是也是有点不一样。 我还是把上面代码注释了,还在那2个 控制台改。


场景:

如果我们需要在远程计算机上运行一个函数并等待结果呢? 嗯,这是一个不同的故事。 此模式通常称为Remote Procedure Call 或者 RPC.


在本教程中,我们将使用RabbitMQ构建RPC系统:客户端和可伸缩的RPC服务器。 由于我们没有任何值得分发的耗时任务,我们将创建一个返回Fibonacci 斐波那契 数字的虚拟RPC服务

为了说明如何使用RPC服务,我们将创建一个简单的客户端类。 它将公开一个名为Call的方法,该方法发送一个RPC请求并阻塞,直到收到答案为止。


请求服务器(生产者,返回一个斐波那契数字)

using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class RPCServer { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); consumer.Received += (model, ea) => { string response = null; var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } private static int fib(int n) { if (n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }

代码比较好理解的,fib是一个 返回斐波那契数字的,这里不考虑数字是否是正整数了。

自己创建一个接收请求的队列,名字叫rpc_queue,手动应答,处理完成,再应答完成。

然后收到一个消息后,处理,中间有个约定的CorrelationId 写上去,

然后 在往这个RouteKey写上 返回值的一些信息。


内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygdwx.html