为了确保消息永远不会丢失,RabbitMQ支持消息确认机制。消费者回发一个确认信号Ack(nowledgement)给RabbitMQ,告诉它某个消息已经被接收、处理并且可以自由删除它。
如果一个消费者在还没有回发确认信号之前就挂了(其通道关闭,连接关闭或者TCP连接丢失),RabbitMQ会认为该消息未被完全处理,并将其重新排队。如果有其他消费者同时在线,该消息将会被会迅速重新分发给其他消费者。这样,即便Worker意外挂掉,也可以确保消息不会丢失。
没有任何消息会超时;当消费者死亡时,RabbitMQ将会重新分发消息。即使处理消息需要非常非常长的时间也没关系。
默认情况下,手动消息确认模式是开启的。在前面的例子中,我们通过将autoAck(“自动确认模式”)参数设置为true来明确地关闭手动消息确认模式。一旦完成任务,是时候删除这个标志并且从Worker手动发送一个恰当的确认信号给RabbitMQ。
// 构建消费者实例。 var consumer = new EventingBasicConsumer(channel); // 绑定消息接收事件。 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); // 模拟耗时操作。 int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); // 手动发送消息确认信号。 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // autoAck:false - 关闭自动消息确认,调用`BasicAck`方法进行手动消息确认。 // autoAck:true - 开启自动消息确认,当消费者接收到消息后就自动发送ack信号,无论消息是否正确处理完毕。 channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);使用上面这段代码,我们可以确定的是,即使一个Worker在处理消息时,我们通过使用CTRL + C来终止它,也不会丢失任何消息。Worker挂掉不久,所有未确认的消息将会被重新分发。
忘记确认
遗漏BasicAck是一个常见的错误。这是一个很简单的错误,但导致的后果却是严重的。当客户端退出时(看起来像是随机分发的),消息将会被重新分发,但是RabbitMQ会吃掉越来越多的内存,因为它不能释放未确认的消息。
为了调试这种错误,您可以使用rabbitmqctl来打印messages_unacknowledged字段:
在Windows上,删除sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged 5.消息持久化我们已经学习了如何确保即使消费者挂掉,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务还是会丢失。
当RabbitMQ退出或崩溃时,它会忘记已存在的队列和消息,除非告诉它不要这样做。为了确保消息不会丢失,有两件事是必须的:我们需要将队列和消息标记为持久。
首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了做到这一点,我们需要把队列声明是持久的(Durable):
// 声明队列,通过指定durable参数为`true`,对消息进行持久化处理。 channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);虽然这个命令本身是正确的,但是它在当前设置中不会起作用。那是因为我们已经定义过一个名为hello的队列,并且这个队列不是持久化的。RabbitMQ不允许使用不同的参数重新定义已经存在的队列,并会向尝试执行该操作的程序返回一个错误。但有一个快速的解决办法 - 让我们用不同的名称声明一个队列,例如task_queue:
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);注意,该声明队列QueueDeclare方法的更改需要同时应用于生产者和消费者代码。
此时,我们可以确定的是,即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久的(Persistent) - 通过将IBasicProperties.Persistent设置为true。
// 将消息标记为持久性。 var properties = channel.CreateBasicProperties(); properties.Persistent = true;关于消息持久性的说明
将消息标记为Persistent并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但当RabbitMQ接收到消息并且尚未保存消息时仍有一段时间间隔。此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存中,并没有真正写入磁盘。消息的持久化保证并不健壮,但对于简单的任务队列来说已经足够了。如果您需要一个更加健壮的保证,可以使用发布者确认。