Redis 命令执行过程(上) (3)

processInputBuffer 主要是将输入缓冲区中的数据解析成对应的命令,根据命令类型是 PROTO_REQ_MULTIBULK 还是 PROTO_REQ_INLINE,来分别调用 processInlineBuffer 和 processMultibulkBuffer 方法来解析命令。

然后调用 processCommand 方法来执行命令。执行成功后,如果是主从客户端,还需要更新同步偏移量 reploff 属性,然后重置 client,让client可以接收一条命令。

void processInputBuffer(client *c) { // networking.c server.current_client = c; /* 当缓冲区中还有数据时就一直处理 */ while(sdslen(c->querybuf)) { .... // 处理 client 的各种状态 /* 判断命令请求类型 telnet发送的命令和redis-cli发送的命令请求格式不同 */ if (!c->reqtype) { if (c->querybuf[0] == '*') { c->reqtype = PROTO_REQ_MULTIBULK; } else { c->reqtype = PROTO_REQ_INLINE; } } /** * 从缓冲区解析命令 */ if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); } /* 参数个数为0时重置client,可以接受下一个命令 */ if (c->argc == 0) { resetClient(c); } else { // 执行命令 if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { // 如果是master的client发来的命令,则 更新 reploff c->reploff = c->read_reploff - sdslen(c->querybuf); } // 如果不是阻塞状态,则重置client,可以接受下一个命令 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) resetClient(c); } } } server.current_client = NULL; }

解析命令暂时不看,就是将 redis 命令文本信息,记录到client的argv/argc属性中

执行命令

processCommand 方法会处理很多逻辑,不过大致可以分为三个部分:首先是调用 lookupCommand 方法获得对应的 redisCommand;接着是检测当前 Redis 是否可以执行该命令;最后是调用 call 方法真正执行命令。

processCommand会做如下逻辑处理:

1 如果命令名称为 quit,则直接返回,并且设置客户端标志位。

2 根据 argv[0] 查找对应的 redisCommand,所有的命令都存储在命令字典 redisCommandTable 中,根据命令名称可以获取对应的命令。

3 进行用户权限校验。

4 如果是集群模式,处理集群重定向。当命令发送者是 master 或者 命令没有任何 key 的参数时可以不重定向。

5 预防 maxmemory 情况,先尝试回收一下,如果不行,则返回异常。

6 当此服务器是 master 时:aof 持久化失败时,或上一次 bgsave 执行错误,且配置 bgsave 参数和 stop_writes_on_bgsave_err;禁止执行写命令。

7 当此服务器时master时:如果配置了 repl_min_slaves_to_write,当slave数目小于时,禁止执行写命令。

8 当时只读slave时,除了 master 的不接受其他写命令。

9 当客户端正在订阅频道时,只会执行部分命令。

10 服务器为slave,但是没有连接 master 时,只会执行带有 CMD_STALE 标志的命令,如 info 等

11 正在加载数据库时,只会执行带有 CMD_LOADING 标志的命令,其余都会被拒绝。

12 当服务器因为执行lua脚本阻塞时,只会执行部分命令,其余都会拒绝

13 如果是事务命令,则开启事务,命令进入等待队列;否则直接执行命令。

int processCommand(client *c) { // 1 处理 quit 命令 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } /** * 根据 argv[0] 查找对应的 command * 2 命令字典查找指定命令;所有的命令都存储在命令字典中 struct redisCommand redisCommandTable[]={} */ c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { // 处理未知命令 } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { // 处理参数错误 } // 3 检查用户验证 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { flagTransaction(c); addReply(c,shared.noautherr); return C_OK; } /** * 4 如果是集群模式,处理集群重定向。当命令发送者是master或者 命令没有任何key的参数时可以不重定向 */ if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { int hashslot; int error_code; // 查询可以执行的node信息 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, &hashslot,&error_code); if (n == NULL || n != server.cluster->myself) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } clusterRedirectClient(c,n,hashslot,error_code); return C_OK; } } // 5 处理maxmemory请求,先尝试回收一下,如果不行,则返回异常 if (server.maxmemory) { int retval = freeMemoryIfNeeded(); .... } /** * 6 当此服务器是master时:aof持久化失败时,或上一次bgsave执行错误, * 且配置bgsave参数和stop_writes_on_bgsave_err;禁止执行写命令 */ if (((server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) || server.aof_last_write_status == C_ERR) && server.masterhost == NULL && (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)) { .... } /** * 7 当此服务器时master时:如果配置了repl_min_slaves_to_write, * 当slave数目小于时,禁止执行写命令 */ if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & CMD_WRITE && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { .... } /** * 8 当时只读slave时,除了master的不接受其他写命令 */ if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE) { .... } /** * 9 当客户端正在订阅频道时,只会执行以下命令 */ if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { .... } /** * 10 服务器为slave,但没有正确连接master时,只会执行带有CMD_STALE标志的命令,如info等 */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) {...} /** * 11 正在加载数据库时,只会执行带有CMD_LOADING标志的命令,其余都会被拒绝 */ if (server.loading && !(c->cmd->flags & CMD_LOADING)) { .... } /** * 12 当服务器因为执行lua脚本阻塞时,只会执行以下几个命令,其余都会拒绝 */ if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != replconfCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) {....} /** * 13 开始执行命令 */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { /** * 开启了事务,命令只会入队列 */ queueMultiCommand(c); addReply(c,shared.queued); } else { /** * 直接执行命令 */ call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } return C_OK; } struct redisCommand redisCommandTable[] = { {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0}, .... // 所有的 redis 命令都有 }

call 方法是 Redis 中执行命令的通用方法,它会处理通用的执行命令的前置和后续操作。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywdwf.html