看看下面的代码。它订阅所有键空间通知并打印任何收到的。
import time from redis import StrictRedis redis = StrictRedis(host='localhost', port=6379) pubsub = redis.pubsub() pubsub.psubscribe('__keyspace@0__:*') print('Starting message loop') while True: message = pubsub.get_message() if message: print(message) else: time.sleep(0.01)这就是我们创建Redis连接的方式:
redis = StrictRedis(host='localhost', port=6379)默认情况下,所有响应都以字节形式返回。用户负责解码它们。如果应解码来自客户端的所有字符串响应,则用户可以将SID_responses = True指定为StrictRedis。在这种情况下,任何返回字符串类型的Redis命令都将使用指定的编码进行解码。
接下来,我们创建一个pubsub对象,该对象订阅一个频道并侦听新消息:
pubsub = redis.pubsub() pubsub.psubscribe('__keyspace@0__:*')然后我们通过无限循环等待事件:
while True: message = pubsub.get_message() ...如果有数据,get_message()将读取并返回它。如果没有数据,则该方法将返回None。
从pubsub实例读取的每条消息都是一个包含以下键的字典:
键入:下列之一:subscribe,unsubscribe,psubscribe,punsubscribe,message,pmessage
channel:订阅的频道或发布消息的频道
pattern:匹配已发布消息的通道的模式(除类型外在所有情况下均为Nonepmessage)
data:消息数据
现在启动python脚本,在另一个终端输入带有值的redis-cli和SET键mykeymyvalue
127.0.0.1:6379> set mykey myvalue OK您将看到脚本的以下输出:
$ python subscribe.py Starting message loop {'type': 'psubscribe', 'data': 1, 'channel': b'__keyspace@0__:*', 'pattern': None} {'type': 'pmessage', 'data': b'set', 'channel': b'__keyspace@0__:mykey', 'pattern': b'__keyspace@0__:*'} 回调也可以注册回调函数来处理已发布的消息。消息处理程序只接受一个参数即消息。要使用消息处理程序订阅通道或模式,请将通道或模式名称作为关键字参数传递,其值为回调函数。当使用消息处理程序在通道或模式上读取消息时,将创建消息字典并将其传递给消息处理程序。在这种情况下,从get_message()返回None值,因为消息已经处理完毕。
import time from redis import StrictRedis redis = StrictRedis(host='localhost', port=6379) pubsub = redis.pubsub() def event_handler(msg): print('Handler', msg) pubsub.psubscribe(**{'__keyspace@0__:*': event_handler}) print('Starting message loop') while True: message = pubsub.get_message() if message: print(message) else: time.sleep(0.01) 127.0.0.1:6379> set mykey myvalue OK如您所见,set事件mykey由event_handler回调处理。
$ python subscribe2.py Starting message loop {'pattern': None, 'channel': b'__keyspace@0__:*', 'data': 1, 'type': 'psubscribe'} Handler {'pattern': b'__keyspace@0__:*', 'channel': b'__keyspace@0__:mykey', 'data': b'set', 'type': 'pmessage'} 单独线程中的事件循环另一种选择是在单独的线程中运行事件循环:
import time from redis import StrictRedis redis = StrictRedis(host='localhost', port=6379) def event_handler(msg): print(msg) thread.stop() pubsub = redis.pubsub() pubsub.psubscribe(**{'__keyevent@0__:expired': event_handler}) thread = pubsub.run_in_thread(sleep_time=0.01)上面的代码创建了一个新线程并启动了事件循环。处理完第一个过期事件后,我们使用该thread.stop()方法关闭事件循环和线程。
在幕后,这只是一个围绕get_message()的包装器,它在一个单独的线程中运行。run_in_thread()采用可选sleep_time参数。如果指定,则事件循环将使用循环的每次迭代中的值调用time.sleep()。
127.0.0.1:6379> set mykey myvalue ex 1 OK预期产量:
$ python subscribe3.py {'type': 'pmessage', 'channel': b'__keyevent@0__:expired', 'pattern': b'__keyevent@0__:expired', 'data': b'mykey'} 概要