注意:
$表示读取队列最新进来的一个消息,不是Stream的最后一个消息。是xread block执行后,再次使用xadd添加消息后,xread block才会返回。
block 0表示永久阻塞,当消息到来时,才接触阻塞。block 1000表示阻塞1000ms,如果1000ms还没有消息到来,则返回nil
xread进行顺序消费 当使用xread进行顺序消息时,需要记住返回的消息id,同时下次调用xread时,需要将上次返回的消息id传递进去。
xread读取消息,完全无视消费组,此时Stream就可以理解为一个普通的list。
9、消费者组相关操作 1、消费者组命令1、创建Stream的名称是 stream-key
2、创建2个消息,aa和bb
127.0.0.1:6379> xadd stream-key * aa aa "1636362619125-0" 127.0.0.1:6379> xadd stream-key * bb bb "1636362623191-0" 3、创建消费者组 1、创建一个从头开始消费的消费者组 xgroup create stream-key(Stream 名) g1(消费者组名) 0-0(表示从头开始消费) 2、创建一个从Stream最新的一个消息消费的消费者组 xgroup create stream-key g2 $$表示从最后一个元素消费,不包括Stream中的最后一个元素,即消费最新的消息。
4、创建一个从某个消息之后消费的消费者组 xgroup create stream-key g3 1636362619125-0 #1636362619125-0 这个是上方aa消息的id的值1636362619125-0某个消息的具体的ID,这个g3消费者组中的消息都是大于>这个id的消息。
3、从消费者中读取消息 127.0.0.1:6379> xreadgroup group g1(消费组名) c1(消费者名,自动创建) count 3(读取3条) streams stream-key(Stream 名) >(从该消费者组中还未分配给另外的消费者的消息开始读取) 1) 1) "stream-key" 2) 1) 1) "1636362619125-0" 2) 1) "aa" 2) "aa" 2) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key > (nil) # 返回 nil 是因为 g2消费组是从最新的一条信息开始读取(创建消费者组时使用了$),需要在另外的窗口执行`xadd`命令,才可以再次读取到消息 127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key > #只读取到一条消息是因为,在创建消费者组时,指定了aa消息的id,bb消息的id大于aa,所以读取出来了。 1) 1) "stream-key" 2) 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> 4、读取消费者的pending消息 127.0.0.1:6379> xgroup create stream-key g4 0-0 OK 127.0.0.1:6379> xinfo consumers stream-key g1 1) 1) "name" 2) "c1" 3) "pending" 4) (integer) 2 5) "idle" 6) (integer) 88792 127.0.0.1:6379> xinfo consumers stream-key g4 (empty array) 127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-0 1) 1) "stream-key" 2) 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb" 127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-0 1) 1) "stream-key" 2) (empty array) 127.0.0.1:6379>