public static void SendMessage() { //延时消息交换机 string delayExchange = "delay.exchange"; //延时消息队列 string delayQueueName = "delay_queue"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { Dictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-delayed-type", "direct"); //x-delayed-type必须加 //创建延时交换机,type类型为x-delayed-message channel.ExchangeDeclare(delayExchange, type: "x-delayed-message", durable: true, autoDelete: false,arguments: args); //创建延时消息队列 channel.QueueDeclare(delayQueueName, durable: true, exclusive: false, autoDelete: false); //交换机绑定队列 channel.QueueBind(delayQueueName, delayExchange, routingKey: delayQueueName); string message = "hello rabbitmq message 10s后处理"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; //延时时间从header赋值 Dictionary<string, object> headers = new Dictionary<string, object>(); headers.Add("x-delay", 10000); properties.Headers = headers; //发布消息,按时10s channel.BasicPublish(exchange: delayExchange, routingKey: delayQueueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"{DateTime.Now},向队列:{delayQueueName}发送消息:{message},延时:10s"); string message2 = "hello rabbitmq message 5s后处理"; var properties2 = channel.CreateBasicProperties(); properties2.Persistent = true; //延时时间从header赋值 Dictionary<string, object> headers2 = new Dictionary<string, object>(); headers2.Add("x-delay", 5000); properties2.Headers = headers2; //发布消息,延时5s channel.BasicPublish(exchange: delayExchange, routingKey: delayQueueName, basicProperties: properties2, body: Encoding.UTF8.GetBytes(message2)); Console.WriteLine($"{DateTime.Now},向队列:{delayQueueName}发送消息:{message2},延时:5s"); } } }
消费者代码:
public static void DelayMessageConsumer() { //延时队列 string queueName = "delay_queue"; var connection = RabbitMQHelper.GetConnection(); { //创建信道 var channel = connection.CreateModel(); { var consumer = new EventingBasicConsumer(channel); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true); consumer.Received += (model, ea) => { //处理业务 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"{DateTime.Now},接收到消息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(queueName, autoAck: false, consumer); } } }
执行代码:
RabbitMQ管理界面,只有一个队列: