前段时间需要使用rabbitmq做写缓存,一直使用pika+rabbitmq的组合,pika这个模块虽然可以很直观地操作rabbitmq,但是官方给的例子太简单,对其底层原理了解又不是很深,遇到很多坑,尤其是需要自己写连接池管理和channel池管理。虽然也有用过celery,一直也是celery+redis的组合,涉及很浅;目前打算深研一下celery+redis+rabbitmq的使用。
celery + rabbitmq初步我们先不在集成框架如flask或Django中使用,而仅仅单独使用。
简单介绍Celery 是一个异步任务队列。一个Celery安装有三个核心组件:
Celery 客户端: 用于发布后台作业。当与 Flask 一起工作的时候,客户端与 Flask 应用一起运行。
Celery workers: 运行后台作业的进程。Celery 支持本地和远程的 workers,可以在 Flask 服务器上启动一个单独的 worker,也可以在远程服务器上启动worker,需要拷贝代码;
消息代理: 客户端通过消息队列和 workers 进行通信,Celery 支持多种方式来实现这些队列。最常用的代理就是 RabbitMQ 和 Redis。
安装rabbitmq和redisrabbitmq安装和配置参考:
redis的安装和配置参考:
redis-py安装:
sudo pip install redisredis-py操作redis参考:
为了提高性能,官方推荐使用librabbitmq,这是一个连接rabbitmq的C++的库;
sudo pip install celery[librabbitmq] 初步使用使用redis做结果存储,使用rabbitmq做任务队列;
# tasks.py from celery import Celery app = Celery('tasks', broker='amqp://username:passwd@ip:port/varhost',backend='redis://username:passwd@ip:6390/db') @app.task def add(x, y): return x + y if __name__ == '__main__': result = add.delay(30, 42)broker:任务队列的中间人;
backend:任务执行结果的存储;
发生了什么事
app.task装饰后将add函数变成一个异步的任务,add.delay函数将任务序列化发送到rabbitmq;
该过程创建一个名字为celery的exchange,类型为direct(直连交换机);创建一个名为celery的queue,队列和交换机使用路由键celery绑定;
打开rabbitmq管理后台,可以看到有一条消息已经在celery队列中;
记住:当有多个装饰器的时候,celery.task一定要在最外层;
扩展
如果使用redis作为任务队列中间人,在redis中存在两个键 celery 和 _kombu.binding.celery , _kombu.binding.celery 表示有一名为 celery 的任务队列(Celery 默认),而 celery为默认队列中的任务列表,使用list类型,可以看看添加进去的任务数据。
开启worker
在task.py同一个目录下执行:
celery -A tasks worker --loglevel=infotask指的就是该celery任务的名字,注意文件名tasks.py和创建celery对象的名字必须一致,否则wroker启动失败;
执行完毕后结果存储在redis中,查看redis中的数据,发现存在一个string类型的键值对:
celery-task-meta-064e4262-e1ba-4e87-b4a1-52dd1418188f:data该键值对的失效时间为24小时。
分析消息
这是添加到任务队列中的消息数据。
{"body": "gAJ9cQAoWAQAAAB0YXNrcQFYGAAAAHRlc3RfY2VsZXJ5LmFkZF90b2dldGhlcnECWAIAAABpZHEDWCQAAAA2NmQ1YTg2Yi0xZDM5LTRjODgtYmM5OC0yYzE4YjJjOThhMjFxBFgEAAAAYXJnc3EFSwlLKoZxBlgGAAAAa3dhcmdzcQd9cQhYBwAAAHJldHJpZXNxCUsAWAMAAABldGFxCk5YBwAAAGV4cGlyZXNxC05YAwAAAHV0Y3EMiFgJAAAAY2FsbGJhY2tzcQ1OWAgAAABlcnJiYWNrc3EOTlgJAAAAdGltZWxpbWl0cQ9OToZxEFgHAAAAdGFza3NldHERTlgFAAAAY2hvcmRxEk51Lg==", # body是序列化后使用base64编码的信息,包括具体的任务参数,其中包括了需要执行的方法、参数和一些任务基本信息 "content-encoding": "binary", # 序列化数据的编码方式 "content-type": "application/x-python-serialize", # 任务数据的序列化方式,默认使用python内置的序列化模块pickle "headers": {}, "properties": {"reply_to": "b7580727-07e5-307b-b1d0-4b731a796652", # 结果的唯一id "correlation_id": "66d5a86b-1d39-4c88-bc98-2c18b2c98a21", # 任务的唯一id "delivery_mode": 2, "delivery_info": {"priority": 0, "exchange": "celery", "routing_key": "celery"}, # 指定交换机名称,路由键,属性 "body_encoding": "base64", # body的编码方式 "delivery_tag": "bfcfe35d-b65b-4088-bcb5-7a1bb8c9afd9"}}将序列化消息反序列化
import pickle import base64 result = base64.b64decode('gAJ9cQAoWAQAAAB0YXNrcQFYGAAAAHRlc3RfY2VsZXJ5LmFkZF90b2dldGhlcnECWAIAAABpZHEDWCQAAAA2NmQ1YTg2Yi0xZDM5LTRjODgtYmM5OC0yYzE4YjJjOThhMjFxBFgEAAAAYXJnc3EFSwlLKoZxBlgGAAAAa3dhcmdzcQd9cQhYBwAAAHJldHJpZXNxCUsAWAMAAABldGFxCk5YBwAAAGV4cGlyZXNxC05YAwAAAHV0Y3EMiFgJAAAAY2FsbGJhY2tzcQ1OWAgAAABlcnJiYWNrc3EOTlgJAAAAdGltZWxpbWl0cQ9OToZxEFgHAAAAdGFza3NldHERTlgFAAAAY2hvcmRxEk51Lg==') print(pickle.loads(result)) # 结果 { 'task': 'test_celery.add_together', # 需要执行的任务 'id': '66d5a86b-1d39-4c88-bc98-2c18b2c98a21', # 任务的唯一id 'args': (9, 42), # 任务的参数 'kwargs': {}, 'retries': 0, 'eta': None, 'expires': None, # 任务失效时间 'utc': True, 'callbacks': None, # 完成后的回调 'errbacks': None, # 任务失败后的回调 'timelimit': (None, None), # 超时时间 'taskset': None, 'chord': None }