using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;
class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs",
type: "topic");
var routingKey = "anonymous.info";
var message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topic_logs",
routingKey: routingKey,
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
}
Console.ReadKey();
}
}
消费者
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs",
type: "topic");
var queueName = channel.QueueDeclare().QueueName;
var severity = "anonymous.*";
channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: severity);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey, message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
跟上面区别是 RouteKey 带了 * 号或者 #号
*号 代表1个单词
#号代表 0个以上的单词
服务端 消息路由规则 anonymous.info
换成 var severity = "a.*";
肯定收不到消息
换成# 肯定可以
*.info也可以
a# 收不到消息的
*.* 可以收到,然后把生产换成anonymous 就收不到了,因为路由规则 1个单词.第二个单词
以上内容是 AY做过测试了。
========================================================================
讲一下RPC,稍微有点绕,你理解一个既是消费端也是生产者,双方都是的,也可以配合 http请求响应稍微配合理解。
但是也是有点不一样。 我还是把上面代码注释了,还在那2个 控制台改。
场景:
如果我们需要在远程计算机上运行一个函数并等待结果呢? 嗯,这是一个不同的故事。 此模式通常称为Remote Procedure Call 或者 RPC.
在本教程中,我们将使用RabbitMQ构建RPC系统:客户端和可伸缩的RPC服务器。 由于我们没有任何值得分发的耗时任务,我们将创建一个返回Fibonacci 斐波那契 数字的虚拟RPC服务
为了说明如何使用RPC服务,我们将创建一个简单的客户端类。 它将公开一个名为Call的方法,该方法发送一个RPC请求并阻塞,直到收到答案为止。
请求服务器(生产者,返回一个斐波那契数字)
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class RPCServer
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "rpc_queue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "rpc_queue",
autoAck: false, consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = null;
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
response = fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
private static int fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}
return fib(n - 1) + fib(n - 2);
}
}
代码比较好理解的,fib是一个 返回斐波那契数字的,这里不考虑数字是否是正整数了。
自己创建一个接收请求的队列,名字叫rpc_queue,手动应答,处理完成,再应答完成。
然后收到一个消息后,处理,中间有个约定的CorrelationId 写上去,
然后 在往这个RouteKey写上 返回值的一些信息。