当我们准备好进行数据的记录与插入后,我们就需要将数据保存为哈希,并在有序集合中插入一个成员/分数对,分别对应事件的id(成员)与事件的时间戳(分数)。记录某个事件的代码如下
def record_event(conn, event):
id = conn.incr('event:id')
event['id'] = id
event_key = 'event:{id}'.format(id=id)
pipe = conn.pipeline(True)
pipe.hmset(event_key, event)
pipe.zadd('events', **{id: event['timestamp']})
pipe.execute()
在这个record_event()函数中,我们获取了一个事件,从Redis中获得一个计算得出的新id,将它赋给事件,并生成了事件保存的键。这个键的构成是字符串“event”加上新的id,并在两者之间由冒号分割所构成的 3 。随后我们创建了一个管道,并准备设置该事件相关的全部数据,同时准备将事件id与时间戳对保存在有序集合中。当事务管道完成执行之后,这一事件将被记录并保存在Redis中。
事件分析从现在起,我们可以通过多种方式对时间序列进行分析了。我们可以通过ZRANGE 4 的设置对最新或最早的事件id进行扫描,并且可以在稍后获取这些事件本身以进行分析。通过结合使用ZRANGEBYSCORE与LIMIT参数,我们能够立即获取到某个时间戳之前或之后的10个、甚至是100个事件。我们也可以通过ZCOUNT计算某一特定时间段内事件发生的次数,甚至选择用Lua脚本实现自己的分析方式。以下的示例将通过Lua脚本计算在一个给定时间范围内各种不同的事件类型的数量。
import json
def count_types(conn, start, end):
counts = count_types_lua(keys=['events'], args=[start, end])
return json.loads(counts)
count_types_lua = conn.register_script('''
local counts = {}
local ids = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2])
for i, id in ipairs(ids) do
local type = redis.call('HGET', 'event:' .. id, 'type')
counts[type] = (counts[type] or 0) + 1
end
return cjson.encode(counts)
''')
这里所定义的count_types()函数首先将参数传递给经过封装的Lua脚本,并对经过json编码的事件类型与数量的映射进行解码。Lua脚本首先创建了一个结果表(对应counts变量),随后通过ZRANGEBYSCORE读取这一时间范围内的事件id的列表。当获取到这些id之后,脚本将一次性读取每个事件中的类型属性,让事件数量表保持不断增长,最后结束时返回一个经过json编码的映射结果。
对性能的思考以及数据建模正如代码所展示的一样,这个用于在特定时间范围内计算不同事件类型数量的方法能够正常工作,但这种方式需要对这一时间范围内的每个事件的类型属性进行大量的读取。对于包含几百或是几千个事件的时间范围来说,这样的分析是比较快的。但如果某个时间范围内饮食了几万乃至几百万个事件,情况又会如何呢?答案很简单,Redis在计算结果时将会阻塞。
有一种方法能够处理在分析事件流时,由于长时间的脚本执行而产生的性能问题,即预先考虑一下需要被执行的查询。具体来说,如果你知道你需要对某一段时间范围内的每种事件的总数进行查询,你就可以为每种事件类型使用一个额外的有序集合,每个集合只保存这种类型事件的id与时间戳对。当你需要计算每种类型事件的总数时,你可以执行一系列ZCOUNT或相同功能的方法调用 5 ,并返回该结果。让我们来看一下这个修改后的record_event()函数,它将保存基于事件类型的有序集合。
def record_event_by_type(conn, event):
id = conn.incr('event:id')
event['id'] = id
event_key = 'event:{id}'.format(id=id)
type_key = 'events:{type}'.format(type=event['type'])
ref = {id: event['timestamp']}
pipe = conn.pipeline(True)
pipe.hmset(event_key, event)
pipe.zadd('events', **ref)
pipe.zadd(type_key, **ref)
pipe.execute()
新的record_event_by_type()函数与旧的record_event()函数在许多方面都是相同的,但新加入了一些操作。在新的函数中,我们将计算一个type_key,这里将保存该事件在对应这一类型事件的有序集合中的位置索引。当id与时间戳对添加到events有序集合中后,我们还要将id与时间戳对添加到type_key这个有序集合中,随后与旧的方法一样执行数据插入操作。
现在,如果需要计算两个时间点之间“visit”这一类型的事件所发生的次数,我们只需在调用ZCOUNT命令时传入所计算的事件类型的特定键,以及开始与结束的时间戳。
def count_type(conn, type, start, end):
type_key = 'events:{type}'.format(type=type)
return conn.zcount(type_key, start, end)