服务端全局变量server中, 持有着一个命令表, 它本身是一个dict结构, 其key是为字符串, 即Redis命令的字符串, value则是为redisCommand结构, 描述了一个Redis中的命令. 可以看到processCommand一开始做的第一件事, 就是用c->argv[0]->ptr去查询对应的redisCommand句柄
Redis中的事务是以一个MULTI命令标记开始, 由EXEC命令标记结束的. 如果一个命令处于事务之中, 那么processCommand内部并不会真正执行这个命令, 仅当最终EXEC命令来临时, 将事务中的所有命令全部执行掉. 而实际执行命令的函数, 是为call, 这是Redis命令执行流程中最核心的一个函数.
客户端可以以pipelining的形式, 通过一次请求发送多个命令, 但这不是事务, 如果没有将命令包裹在MULTI和EXEC中, 那么这多个命令其实是一个个就地执行的.
有关事务的更详细细节我们会在稍后讨论, 但目前, 我们先来看一看这个最核心的call函数, 以下代码隐藏了无关细节
void call(client * c, int flags) { // ... // 与主从复制相关的代码: 将命令分发给所有MONITOR模式下的从实例 // ... // 调用命令处理函数 // ... c->cmd->proc(c); // ... // AOF, 主从复制相关代码 // ... }拨开其它特性不管, 单机数据库在这一步其实很简单, 就是调用了对应命令字的回调函数来处理
我们挑其中一个命令SET来看一下, 命令处理回调中都干了些什么:
void setCommand(client * c) { int j; robj * expire = NULL; int uint = UINT_SECONDS; int flags = OBJ_SET_NO_FLAGS; for (j = 3; i < c->argc; j++) { // 处理SET命令中的额外参数 // ... } c->argv[2] = tryObjectEncoding(c->argv[2]); setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL) } void setGenericCommand(client * c, int flags, robj * key, robj, * val, robj * expire, int unit, robj * ok_reply, robj * abort_reply) { long long milliseconds = 0; if (expire) { // 从string对象中读取数值化的过期时间 if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return; if (milliseconds <= 0){ addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name); return; } if (unit == UNIT_SECONDS) milliseconds *= 1000; } if ( (flags & OBJ_SET_NX && lookupKeyWrite(c->db, key) != NULL) || (flags & OBJ_SET_XX && lookupKeyWrite(c->db, key) == NULL) ) { // NX 与 XX 时, 判断键是否存在 addReply(c, abort_reply ? abort_reply : shared.nullbulk); return; } // 执行所谓的set操作, 即把键值对插入至数据库键空间去, 如果键已存在, 则替换值 setKey(c->db, key, val); server.dirty++; // 统计数据更新 // 设置过期时间 if(expire) setExpire(c, c->db, key, kstime() + milliseconds); notifyKeyspaceEvent(NOTIFY_STRING, "set", key, c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id); addReply(c, ok_reply ? ok_reply : shared.ok); } void setKey(redisDb * db, robj * key, robj * val) { if (lookupKeyWrite(db, key) == NULL) { dbAdd(db, key, val); // 对 dictAdd 的封装 } else { dbOverwrite(db, key, val); // 对 dictReplace的封装 } incrRefCount(val); removeExpire(db, key); signalModifiedKey(db, key); }