最近在看redis这方面的知识,发现在redis5中产生了一种新的数据类型Stream,它和kafka的设计有些类似,可以当作一个简单的消息队列来使用。
二、redis中Stream类型的特点
是可持久化的,可以保证数据不丢失。
支持消息的多播、分组消费。
支持消息的有序性。
三、Stream的结构解释:
消费者组: Consumer Group,即使用 XGROUP CREATE 命令创建的,一个消费者组中可以存在多个消费者,这些消费者之间是竞争关系。
同一条消息,只能被这个消费者组中的某个消费者获取。
多个消费者之间是相互独立的,互不干扰。
消费者: Consumer 消费消息。
last_delivered_id: 这个id保证了在同一个消费者组中,一个消息只能被一个消费者获取。每当消费者组的某个消费者读取到了这个消息后,这个last_delivered_id的值会往后移动一位,保证消费者不会读取到重复的消息。
pending_ids:记录了消费者读取到的消息id列表,但是这些消息可能还没有处理,如果认为某个消息处理,需要调用ack命令。这样就确保了某个消息一定会被执行一次。
消息内容:是一个键值对的格式。
Stream 中 消息的 ID: 默认情况下,ID使用 * ,redis可以自动生成一个,格式为 时间戳-序列号,也可以自己指定,一般使用默认生成的即可,且后生成的id号要比之前生成的大。
四、Stream的命令 1、XADD 往Stream末尾添加消息 1、命令格式 xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]xadd 命令 返回的是数据的id, xx-yy (xx指的是毫秒数,yy指的是在这个毫秒内的第几条消息)
1、向流中增加一条数据, 127.0.0.1:6379> xadd stream-key * username zhangsan # 向stream-key这个流中增加一个 username 是zhangsan的数据 *表示自动生成id "1635999858912-0" # 返回的是ID 127.0.0.1:6379> keys * 1) "stream-key" # 可以看到stream自动创建了 127.0.0.1:6379> 2、向流中增加数据,不自动创建流 127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因为指定了nomkstream参数,而not-exists-stream之前不存在,所以加入失败 (nil) 127.0.0.1:6379> keys * (empty array) 127.0.0.1:6379> 3、手动指定ID的值 127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此处id的值是自己传递的1-1,而不是使用*自动生成 "1-1" # 返回的是id的值 127.0.0.1:6379> 4、设置一个固定大小的Stream 1、精确指定Stream的大小指定指定Stream的大小比模糊指定Stream的大小会稍微多少消耗一些性能。
~ 模糊指定流的大小,可以看到指定的是1,实际上已经到了3.
2、XRANGE查看Stream中的消息 1、命令格式 xrange key start end [COUNT count]