关于RabbitMQ是什么以及它的概念,不了解的可以先查看一下下面推荐的几篇博客
https://blog.csdn.net/whoamiyang/article/details/54954780
https://www.cnblogs.com/frankyou/p/5283539.html
https://blog.csdn.net/mx472756841/article/details/50815895
官网介绍:
本文github源码:
因为之前不了解交换机及AMQP协议,上来就研究RabbitMQ,很多概念都有点蒙圈,所以建议大家在学习RabbitMQ之前先对一些概念有基本的了解
安装与配置:下载Erlang:
下载rabbitmq:
环境变量配置:
新增:ERLANG_HOME= C:\Program Files\erlx.x 新增:RABBITMQ_SERVER=C:\Program Files\RabbitMQ Server\rabbitmq_server-x.x.x Path中新增:%ERLANG_HOME%\bin;%RABBITMQ_SERVER%\sbin; 服务相关命令rabbitmq-plugins enable rabbitmq_management //开启管理插件
rabbitmq-service.bat start //开启服务
rabbitmq-service.bat stop //关闭服务
rabbitmqctl list_queues //查看任务
注意在执行命令rabbitmqctl list_queues时若报错unable to perform an operation on node。。。。,可将C:\Users\用户名\.erlang.cookie.erlang.cookie文件拷贝到C:\Windows\System32\config\systemprofile\.erlang.cookie中替换,然后重启服务
至此RabbitMQ服务我们已经安装好了
后台管理开启管理插件后我们重启rabbitmq服务,打开:15672/后台管理界面,
用户名和密码均为guest
guest账户在最新版本只能通过localhost登陆了,如果想要通过ip来登陆需要设置一下配置文件:
找到/rabbitmq_server-x.x.x/ebin下面的rabbit.app文件文件: 找到:loopback_users将里面的<<”guest”>>删除。
删除后的内容为:{loopback_users, []},然后重启服务
关于用户密码管理的操作我们都可以在管理页面中设置
默认端口:client端通信口5672
管理口15672
server间内部通信口25672
erlang发现口:4369
想要修改默认端口可修改 安装目录下 etc/rabbitmq.config文件,有个默认的example,改一改就可以了
发送消息我们先构建一个应用程序,建议创建一个winform或wpf程序,控制台在这里并不太好用。
项目中引用nuget包:RabbitMQ.Client
接下来我们编写一个发送消息和接收消息的代码:
public void SendMsg(string message) { //这里的端口及用户名都是默认的,可以直接设置一个hostname=“localhost”其他的不用配置 var factory = new ConnectionFactory() { HostName = "192.168.1.15",Port=5672,UserName= "guest",Password= "guest" }; //创建一个连接,连接到服务器: using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //创建一个名称为hello的消息队列 //durable:队列持久化,为了防止RabbitMQ在退出或者crash等异常情况下数据不会丢失,可以设置durable为true //exclusive:排他队列,只对首次声明它的连接(Connection)可见,不允许其他连接访问,在连接断开的时候自动删除,无论是否设置了持久化 //autoDelete:自动删除,如果该队列已经没有消费者时,该队列会被自动删除。这种队列适用于临时队列。 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: true, arguments: null); //channel.BasicConsume("hello", autoAck: true); var props = channel.CreateBasicProperties(); //消息持久化,若启用durable则该属性启用 props.Persistent = true; //封装消息主体 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: props, body: body); Console.WriteLine(" 发送消息{0}", message); } } } public class Consumer : IDisposable { public static int _number; private static ConnectionFactory factory; private static IConnection connection; static Receive() { factory = new ConnectionFactory() { HostName = "localhost" }; } public Receive() { _number++; } public void ReceiveMsg(Action<string> callback) { if(connection==null||!connection.IsOpen) connection = factory.CreateConnection(); IModel _channel = connection.CreateModel(); _channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: true, arguments: null); _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); // 创建事件驱动的消费者 var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); callback($"number:{_number}.message:{message}"); //模拟消息处理需要两秒 Thread.Sleep(2000); //显示发送ack确认接收并处理完成消息,只有在前面进行启用显示发送ack机制后才奏效。 _channel.BasicAck(ea.DeliveryTag, false); }; //指定消费队列,autoAct是否自动确认 string result = _channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer); //设置后当所有的channel都关闭了连接会自动关闭 //connection.AutoClose = true; } public void Dispose() { if (connection != null && connection.IsOpen) connection.Dispose(); } }上面一个很简单的消息队列的发送者和消费者,解释下基本的流程: