基于 RabbitMQ 构建一个类似 Resque 的作业处理系统

RabbitMQ的是一个复杂的野兽。 

它灵活,强大,但也很难完全把控和掌握。 

许多不同的使用情况和使用模式都可以建立在这个强大的软件之上,但在第一次尝试为一个特定的解决方案编写代码时,差错和设计错误也是司空见惯的事情。 

在本文中,我将讨论   Palermo 的设计和实现,它是一个实现了可以将RabbitMQ用作底层排队机制构建的那些可能的使用模式其中之一:批处理作业处理系统。 

通过批处理作业处理系统,我们引用了一种机制来由 作业程序从不同队列中提取作业的自动执行。客户可以排入新的作业到这些队列,它们最终将被传递给将执行作业的程序。如果在执行一个任务失败了,作业程序线程将会把失败的作业放入一个特殊的队列,它可以重新执行,或者检查出来进行错误调试。

创建Palermo的灵感主要来自于Resque ,它是一个作业处理系统,由Github使用Ruby和Redis创建.

对于系统的使用者,Resque让这些变成可能:定义拥有不同名字的队列;作业排队时,在系统中通过某个输入参数匹配Ruby类名;在不同的机器中启动工作者进程--它们将处理作业,实例化Ruby类,并使用提供的输入参数来执行作业.如果作业执行失败,作业将被路由到一个特殊的"失败"作业队列.在这个队列中的作业会重新执行或被删除.Resque工作者和底层的操作系统很好的集成在一起,它可以像系统操作者控制作业执行的方式,来处理即将到来的信号.它也提供了一个web接口,可以监控作业和工作者的状态,同时也可以执行某种动作,像作业的重新排队或队列中作业的清理.

从一个开发者的角度来讲,Resque的主要优势是整个系统的使用超级简单.定义和排队作业只需要几行Ruby代码即可,同时系统以一种持续的和鲁棒的方式运行.

Palermo的目标是针对JVM语言,创建一个简单易用的作业处理系统,像Resque一样的健壮,并使用RabbitMQ作为底层的队列技术而不是Redis.

从一个正式的角度来讲,一个作业处理系统的队列技术可以定义为一个元组空间(tuple space),一种连续关联的内存,一些进程排队作业使用如下的方式编写元组:

write(queue_name, job_type, input_argument)

工作者进程从内存删除元组,一次一个,使用谓词匹配队列名:

read(queue_name, ?, ?)

一个仅有的,必须以元组空间,加入到作业处理队列技术模型中的限制是,来自分布式内存中的读函数必须遵从先进先出(FIFO)的语义.RabbitMQ队列可以看作是这种类型的,包含FIFO语义的内存.队列名作为第一个参数传递到"读"谓词,用来提取存储在内存中的下一个匹配的元组.

基于 RabbitMQ 构建一个类似 Resque 的作业处理系统

为了获取想要读取的元组空间语义,我们需要处理RabbitMQ内部的一些设置,并配置如下的选项:

- 队列的持久性
– 消息的持久性
– 服务的质量
– 消息确认

由于要使用可持续的内存,我们需要为RabbitMQ所管理的队列和消息增加可持续性.为了达到这个效果,在RabbitMQ之中,队列需要声明为"durable",消息需要声明为"persistent".通过这种方式,即便RabbitMQ broker崩溃了,对于已经创建的队列信息和未决的消息将在重启时恢复.

当超过一个工作者连接到同一个队列时,RabbitMQ会使用round robin的方式,在所有的可用工作者中分发消息.这里主要的问题在于,只要消息到达队列,RabbitMQ就将发送一个消息到下一个工作者,而不管这个工作者是否正在处理一个不同的消息.如果一个作业需要很长的时间来处理,到来的消息将堆叠在这个繁忙工作者的本地缓存中,而其他的工作者将处于闲置状态.我们可以使用RabbitMQ的两个特性来处理这种情况:消息确认和服务质量/预取数量.

首先,工作者采用显式确认的方式处理完消息后可以通知RabbitMQ.消息只有确认后才会从队列删除.如果工作者停止运行而没有确认消息,RabbitMQ会自动将消息重新排队.

同时,服务质量(qos)配置可以告知RabbitMQ,发送给特定工作者的最大未确认消息数.如果设置了这个值,即从预取数量到1,只有最后一条消息被工作者进程确认,其他消息才能从队列发送出去.
通过这种方式,可以实现在工作者之间合理的分配作业.

基于 RabbitMQ 构建一个类似 Resque 的作业处理系统

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/72db35af9f2c89c293ef18d81bd5bd72.html