该队列声明的变更需要同时作用于生产者和消费者两处的代码(参考第一章中 Receiving 这一节提到的“尝试从队列中消费消息时,确保队列总是已存在的”,因为无法保障会先打开哪一个终端,所以该队列声明的代码要写两处)。
At this point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting IBasicProperties.SetPersistent to true.
此时此刻,我们可以确定即使 RabbitMQ 重启了,名为“task_queue”的队列也不再丢失。现在我们需要通过设置 IBasicProperties.SetPersistent 的值为 true,从而标识消息为可持久的。
var properties = channel.CreateBasicProperties(); properties.Persistent = true; Note on message persistence 注意消息的持久Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.
将消息标识为持久并不能完全保证一个消息也不会被丢失。尽管该标识告诉 RabbitMQ 要将消息保存到磁盘,但是当 RabbitMQ 已经接到一个消息却尚未保存它之际,将仍然有一个很小的时间窗口。另外,很可能这还只是保存到了缓存而未实实在在地写入到磁盘。尽管该持久保障措施还不是很强,但对于我们简单的任务队列已经是绰绰有余。如果你需要一个更强大的保障,可以使用发布者确认机制。
Fair dispatch 公平分发You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.
你可能也注意到了,这种分发模式也不如我们所期望。比如在有两个工作单元的情景下,(并有多条消息相继而来),假设奇数项的消息比较冗繁,而偶数项的消息相对轻巧些。这样,其中一个工作单元将会持续地繁忙,而另一个工作单元则几乎不做任何事情。然而,RabbitMQ 并不知情而且还会继续朝奇数方向分发消息。
This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.
之所以发生这样的情形,是因为当消息进入到队列时 RabbitMQ 就开始分发,而忽视了消费者这边未确认消息的数量,它只是盲目地向第 n 个消费者分发每一条消息。
In order to change this behavior we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
为了改变这种行为,我们可以使用 BasicQos 方法的 prefetchCount = 1 设置。它将告诉 RabbitMQ 向工作单元分发消息时一次不要超过一个。或者换一句话来讲,直到一个工作单元已处理完成并确认过上一个消息时,才把消息发送给它。反之,RabbitMQ 会把消息分发给下一个并不繁忙的工作单元。(从而达到公平分发的效果。)
channel.BasicQos(0, 1, false); Note about queue size 注意队列大小If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.
如果所有的工作单元都很繁忙,你的队列将会被填满,这时就需要你密切注视它,也许可以添加更多的工作单元,或者采取其他的策略。
Putting it all together 融合一起Final code of our NewTask.cs class: