Redis Stream类型的使用 (3)

阻塞读取对尾消息

注意:

$表示读取队列最新进来的一个消息,不是Stream的最后一个消息。是xread block执行后,再次使用xadd添加消息后,xread block才会返回。

block 0表示永久阻塞,当消息到来时,才接触阻塞。block 1000表示阻塞1000ms,如果1000ms还没有消息到来,则返回nil

xread进行顺序消费 当使用xread进行顺序消息时,需要记住返回的消息id,同时下次调用xread时,需要将上次返回的消息id传递进去。

xread读取消息,完全无视消费组,此时Stream就可以理解为一个普通的list。

9、消费者组相关操作 1、消费者组命令

消费者组命令

2、准备数据

1、创建Stream的名称是 stream-key

2、创建2个消息,aa和bb

127.0.0.1:6379> xadd stream-key * aa aa "1636362619125-0" 127.0.0.1:6379> xadd stream-key * bb bb "1636362623191-0" 3、创建消费者组 1、创建一个从头开始消费的消费者组 xgroup create stream-key(Stream 名) g1(消费者组名) 0-0(表示从头开始消费) 2、创建一个从Stream最新的一个消息消费的消费者组 xgroup create stream-key g2 $

$表示从最后一个元素消费,不包括Stream中的最后一个元素,即消费最新的消息。

4、创建一个从某个消息之后消费的消费者组 xgroup create stream-key g3 1636362619125-0 #1636362619125-0 这个是上方aa消息的id的值

1636362619125-0某个消息的具体的ID,这个g3消费者组中的消息都是大于>这个id的消息。

3、从消费者中读取消息 127.0.0.1:6379> xreadgroup group g1(消费组名) c1(消费者名,自动创建) count 3(读取3条) streams stream-key(Stream 名) >(从该消费者组中还未分配给另外的消费者的消息开始读取) 1) 1) "stream-key" 2) 1) 1) "1636362619125-0" 2) 1) "aa" 2) "aa" 2) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key > (nil) # 返回 nil 是因为 g2消费组是从最新的一条信息开始读取(创建消费者组时使用了$),需要在另外的窗口执行`xadd`命令,才可以再次读取到消息 127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key > #只读取到一条消息是因为,在创建消费者组时,指定了aa消息的id,bb消息的id大于aa,所以读取出来了。 1) 1) "stream-key" 2) 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> 4、读取消费者的pending消息 127.0.0.1:6379> xgroup create stream-key g4 0-0 OK 127.0.0.1:6379> xinfo consumers stream-key g1 1) 1) "name" 2) "c1" 3) "pending" 4) (integer) 2 5) "idle" 6) (integer) 88792 127.0.0.1:6379> xinfo consumers stream-key g4 (empty array) 127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-0 1) 1) "stream-key" 2) 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-0 1) 1) "stream-key" 2) (empty array) 127.0.0.1:6379>

从消费者的pending列表中读取消息

5、转移消费者的消息 127.0.0.1:6379> xpending stream-key g1 - + 10 c1 1) 1) "1636362619125-0" 2) "c1" 3) (integer) 2686183 4) (integer) 1 2) 1) "1636362623191-0" 2) "c1" 3) (integer) 102274 4) (integer) 7 127.0.0.1:6379> xpending stream-key g1 - + 10 c2 (empty array) 127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-0 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> xpending stream-key g1 - + 10 c2 1) 1) "1636362623191-0" 2) "c2" 3) (integer) 17616 4) (integer) 8 127.0.0.1:6379>

xclaim转移消息

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzsjp.html