RabbitMQ学习总结 (3)

上面的一个示例中我们用的是默认的交换机发送消息,我们可以通过给exchange赋值来使用指定的交换机,通过QueueBind将交换机与队列进行绑定

_channel.QueueBind("log1", "logs", "info");

声明一个交换机的代码如下

_channel.ExchangeDeclare("logs", ExchangeType.Direct, false, false);

我们将队列log1绑定到了交换机:logs上,路由为info,交换机的类型为Direct,Direct代表的是路由完全匹配,现在我们向logs交换机发送一条消息,路由为info,队列log1就会接收到消息了

channel.BasicPublish(exchange: "logs", routingKey: "info", basicProperties: props, body: body);

队列和交换机的关系是多对多的,交换机的类型常用的有三个:Direct,Fanout,Topic,Headers

Direct:要求路由键完全匹配

Fanout:忽略路由键,给所有绑定到交换机上的队列都发送消息

Topic:模糊匹配,通过字母配合符号“*”和“#”来设置路由键

Headers:Headers类型用的比较少,它也忽略路由键,而是匹配交换机的headers,headers为键值对的hashtable,对publisher和consumer两边设置的header进行匹配,需要指定匹配的方式是 all还是any,具体代码可看github

下面展示了一个使用direct类型交换机的相关代码

public class LogDirectPub { public void SendMsg(string message) { var factory = new ConnectionFactory() { HostName = "192.168.1.15", Port = 5672, UserName = "guest", Password = "guest" }; //创建一个连接,连接到服务器: using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { var props = channel.CreateBasicProperties(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "info", basicProperties: props, body: body); channel.BasicPublish(exchange: "logs", routingKey: "error", basicProperties: props, body: body); Console.WriteLine("发送消息{0}", message); } } } } public class LogDirectConsumer : IDisposable { private static ConnectionFactory factory; private static IConnection connection; static LogDirectConsumer() { factory = new ConnectionFactory() { HostName = "localhost" }; } public void ReceiveMsg(Action<string> callback) { if (connection == null || !connection.IsOpen) connection = factory.CreateConnection(); IModel _channel = connection.CreateModel(); _channel.ExchangeDeclare("logs", ExchangeType.Direct, false, false); _channel.QueueDeclare(queue: "log1", durable: false, exclusive: false, autoDelete: false, arguments: null); _channel.QueueBind("log1", "logs", "info"); _channel.QueueBind("log1", "logs", "error"); _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); callback($"log1Write.message:{ea.RoutingKey}:{message}"); //模拟消息处理需要两秒 Thread.Sleep(2000); _channel.BasicAck(ea.DeliveryTag, false); }; string result = _channel.BasicConsume(queue: "log1", autoAck: false, consumer: consumer); } public void Dispose() { if (connection != null && connection.IsOpen) connection.Dispose(); } } RabbitMQ Management HTTP API

RabbitMQ有一套自己的http/api,地址为:15672/api,可以查询你想查的所有信息配置,通过这些api,我们可以自己实现RabbitMQ的监控管理,英文看的头痛,这里有一篇中文的翻译文档:

这是一个获取所有队列的简单示例:

string username = "guest"; string password = "guest"; string queuesUrl = "http://localhost:15672/api/queues"; /// <summary> /// 查询所有队列 /// </summary> /// <returns></returns> public string GetAllQuenes() { string jsonContent = GetApiResult(queuesUrl).Result; List<QueueModel> queues = JsonConvert.DeserializeObject<List<QueueModel>>(jsonContent); return JsonConvert.SerializeObject(queues); } private async Task<string> GetApiResult(string Url) { var client = new HttpClient(); var passByte = Encoding.UTF8.GetBytes(string.Format("{0}:{1}", username, password)); client.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", Convert.ToBase64String(passByte)); using (HttpResponseMessage response = await client.GetAsync(Url).ConfigureAwait(false)) { string result = await response.Content.ReadAsStringAsync().ConfigureAwait(false); return result; } } 自定义Consumer

之前说过用QueueingBasicConsumer会有性能问题,但是eventconsumer无法阻塞线程,对于某些需要阻塞线程的功能用起来不太方便,这时我们就可以自定义一个Consumer继承DefaultBasicConsumer,只需要实现其中的HandleBasicDeliver函数就可以了,下面是我定义的一个consumer,用来实现后面的Rpc客户端

public class QueueingConsumer : DefaultBasicConsumer { private IModel _channel; private BasicDeliverEventArgs args = new BasicDeliverEventArgs(); private AutoResetEvent argResetEvent = new AutoResetEvent(false); public QueueingConsumer(IModel channel) { _channel = channel; } public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) { args = new BasicDeliverEventArgs { ConsumerTag = consumerTag, DeliveryTag = deliveryTag, Redelivered = redelivered, Exchange = exchange, RoutingKey = routingKey, BasicProperties = properties, Body = body }; argResetEvent.Set(); } public void GetResult(Action<BasicDeliverEventArgs> callback) { argResetEvent.WaitOne(); callback(args); } } RPC实现

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyssgz.html