消费者:
using System; using System.Collections.Concurrent; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var body = ea.Body; var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); } public void Close() { connection.Close(); } } public class Rpc { public static void Main() { var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); Console.ReadLine(); } }以上的调用没有进行类型判断响应,比如输入的是否合法,如果服务器没有运行等,这里是最简单的调用示例。
客户端是否应该有超时设计等。
比如服务器处理发生异常,是否应该转发回客户端,说没处理好。
此处介绍的设计并不是RPC服务的唯一可能实现,但它具有一些重要优势:
如果RPC服务器太慢,您可以通过运行另一个服务器来扩展。 尝试在新控制台中运行第二个RPCServer。
在客户端,RPC只需要发送和接收一条消息。 不需要像QueueDeclare这样的同步调用。 因此,对于单个RPC请求,RPC客户端只需要一次网络往返。
测试:
先运行 服务端,服务端等待 消费者的请求,然后接收到,处理返回给 消费者。
到此,基本的类库调用 AY讲解完了。
消息超时处理,比如订单 超过24小时不处理,系统取消订单。这个需求如果用 数据库去轮 就不好了。
这种场景: 延迟任务
知识:消息的TTL和死信Exchange
RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。
rabbitmq-delayed-message-exchange这个插件也可以实现,看C#实现。