在CentOS上安装RabbitMQ流程(3)

4. 测试     这里有两种测试方法,可以在刚才安装的管理里面进行测试,也可以通过客户端来进行测试,RabbitMQ支持丰富的客户端语言

    这里有相应的工具下载
   

    我们这里使用Python来做一下测试,因为其代码比较简洁。

    首先,我们需要一个Python的AMQP库。有两个可选:

py-amqplib – 通用的AMQP
txAMQP – 使用 Twisted 框架的AMQP库,因此允许异步I/O。
根据你的需求,py-amqplib或者txAMQP都是可以的。因为是基于Twisted的,txAMQP可以保证用异步IO构建超高性能的AMQP程序。但是Twisted编程本身就是一个很大的主题……因此清晰起见,我们打算用 py-amqplib。更新:请参见Esteve Fernandez关于txAMQP的使用和代码样例的回复。

AMQP支持在一个TCP连接上启用多个MQ通信channel,每个channel都可以被应用作为通信流。每个AMQP程序至少要有一个连接和一个channel。

from amqplib import client_0_8 as amqp   conn = amqp.Connection(host="localhost:5672 "userid="guest",   password="guest"virtual_host="/"insist=False)   chan = conn.channel()  

每个channel都被分配了一个整数标识,自动由Connection()类的.channel()方法维护。或者,你可以使用.channel(x)来指定channel标识,其中x是你想要使用的channel标识。通常情况下,推荐使用.channel()方法来自动分配channel标识,以便防止冲突。

现在我们已经有了一个可以用的连接和channel。现在,我们的代码将分成两个应用,生产者(producer)和消费者(consumer)。我们先创建一个消费者程序,他会创建一个叫做“po_box”的队列和一个叫“sorting_room”的交换机:

chan.queue_declare(queue="po_box"durable=True,   exclusive=Falseauto_delete=False)   chan.exchange_declare(exchange="sorting_room"type="direct"durable=True,   auto_delete=False,)  

这段代码干了啥?首先,它创建了一个名叫“po_box”的队列,它是durable的(重启之后会重新建立),并且最后一个消费者断开的时候不会自动删除(auto_delete=False)。在创建durable的队列(或者交换机)的时候,将auto_delete设置成false是很重要的,否则队列将会在最后一个消费者断开的时候消失,与durable与否无关。如果将durable和auto_delete都设置成True,只有尚有消费者活动的队列可以在RabbitMQ意外崩溃的时候自动恢复。

(你可以注意到了另一个标志,称为“exclusive”。如果设置成True,只有创建这个队列的消费者程序才允许连接到该队列。这种队列对于这个消费者程序是私有的)。

还有另一个交换机声明,创建了一个名字叫“sorting_room”的交换机。auto_delete和durable的含义和队列是一样的。但是,.excange_declare() 还有另外一个参数叫做type,用来指定要创建的交换机的类型(如前面列出的): fanout, direct 和 topic.

到此为止,你已经有了一个可以接收消息的队列和一个可以发送消息的交换机。不过我们需要创建一个绑定,把它们连接起来。

chan.queue_bind(queue=”po_box”, exchange=”sorting_room”,   routing_key=”jason”)  

这个绑定的过程非常直接。任何送到交换机“sorting_room”的具有路由键“jason” 的消息都被路由到名为“po_box” 的队列。

现在,你有两种方法从队列当中取出消息。第一个是调用chan.basic_get(),主动从队列当中拉出下一个消息(如果队列当中没有消息,chan.basic_get()会返回None, 因此下面代码当中print msg.body 会在没有消息的时候崩掉):

msg = chan.basic_get("po_box")   print msg.body   chan.basic_ack(msg.delivery_tag)  

但是如果你想要应用程序在消息到达的时候立即得到通知怎么办?这种情况下不能使用chan.basic_get(),你需要用chan.basic_consume()注册一个新消息到达的回调。

def recv_callback(msg):       print 'Received: ' + msg.body   chan.basic_consume(queue='po_box'no_ack=True,   callback=recv_callbackconsumer_tag="testtag")   while True:       chan.wait()   chan.basic_cancel("testtag")  

chan.wait() 放在一个无限循环里面,这个函数会等待在队列上,直到下一个消息到达队列。chan.basic_cancel() 用来注销该回调函数。参数consumer_tag 当中指定的字符串和chan.basic_consume() 注册的一直。在这个例子当中chan.basic_cancel() 不会被调用到,因为上面是个无限循环…… 不过你需要知道这个调用,所以我把它放在了代码里。

需要注意的另一个东西是no_ack参数。这个参数可以传给chan.basic_get()和chan.basic_consume(),默认是false。当从队列当中取出一个消息的时候,RabbitMQ需要应用显式地回馈说已经获取到了该消息。如果一段时间内不回馈,RabbitMQ会将该消息重新分配给另外一个绑定在该队列上的消费者。另一种情况是消费者断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。如果将no_ack 参数设置为true,则py-amqplib会为下一个AMQP请求添加一个no_ack属性,告诉AMQP服务器不需要等待回馈。但是,大多数时候,你也许想要自己手工发送回馈,例如,需要在回馈之前将消息存入数据库。回馈通常是通过调用chan.basic_ack()方法,使用消息的delivery_tag属性作为参数。参见chan.basic_get() 的实例代码。

好了,这就是消费者的全部代码。(下载:amqp_consumer.py )

不过没有人发送消息的话,要消费者何用?所以需要一个生产者。下面的代码示例表明如何将一个简单消息发送到交换区“sorting_room”,并且标记为路由键“jason” :

msg = amqp.Message("Test message!")   msg.properties["delivery_mode"] = 2   chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")  

你也许注意到我们设置消息的delivery_mode属性为2,因为队列和交换机都设置为durable的,这个设置将保证消息能够持久化,也就是说,当它还没有送达消费者之前如果RabbitMQ重启则它能够被恢复。

剩下的最后一件事情(生产者和消费者都需要调用的)是关闭channel和连接:

chan.close()   conn.close()  

很简单吧。(下载:amqp_publisher.py )

来真实地跑一下吧……
现在我们已经写好了生产者和消费者,让他们跑起来吧。假设你的RabbitMQ在localhost上安装并且运行。

打开一个终端,执行python ./amqp_consumer.py让消费者运行,并且创建队列、交换机和绑定。

然后在另一个终端运行python ./amqp_publisher.py “AMQP rocks.” 。如果一切良好,你应该能够在第一个终端看到输出的消息。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/psjzw.html