Redis中单机数据库的实现 (9)

一个命令及其所有参数如果被成功解析, 则processMultibulkBuffer函数会返回C_OK, 并在随后调用processCommand执行这个命令. processCommand内部包含了将命令的回应写入c->buf的动作(封装在函数addReply中. 如果命令或命令中的参数在解析过程出错, 或参数尚未接收完毕, 则会返回C_ERR, 退栈, 重新返回, 直至再次从传输层读取数据, 补齐所有参数, 再次回到这里.

接下来我们来看执行命令并写回包的过程, processCommand中的关键代码如下:

int processCommand(client * c) { if (!strcasecmp(c->argv[0]->ptr, "quit")) { // 对 quit 命令做单独处理, 调用addReply后, 返回 addReply(c, shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } // 拿出c->argv[0]中存储的string对象中的ptr字段(其实是个sds字符串) // 与server.commands中的命令字表进行比对, 查找出对应的命令, 即 struct redisCommand* 句柄 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { // 查无该命令, 进行错误处理 flagTransaction(c); // 如果这是一个在事务中的命令, 则将 CLIENT_DIRTY_EXEC 标志位贴到 c-flags 中, 指示着事务出错 addReplyErrorFormat(c, "unknown command '%s'", (char *)c->argv[0]->ptr) return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { // 对比命令所需要的参数数量, 是否和argc中的数量一致, 如果不一致, 说明错误 flagTransaction(c); // 依然同上, 事务出错, 则打 CLIENT_DIRTY_EXEC 标志位 addReplyErrorFormat(c, "wrong number of arguments for '%s' command", c->cmd->name); return C_OK; } // 检查用户的登录态, 即在服务端要求登录认证时, 若当前用户还未认证, 且当前命令不是认证命令的话, 则报错 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { flagTransaction(c); addReply(c, shared.noautherr); return C_OK; } // 与集群相关的代码 // 如果集群模式开启, 则通常情况下需要将命令重定向至其它实例, 但在以下两种情况下不需要重定向: // 1. 命令的发送者是master // 2. 命令没有键相关的参数 if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { int hashslot, error_code; clusterNode * n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code); if (n == NULL || n != server.cluster->myseld) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } clusterRedirectClient(c, n, hashslot, error_code); // 重定向 return C_OK; } } // 在正式执行命令之前, 首先先尝试的放个屁, 挤点内存出来 if (server.maxmemory) { int retval = freeMemoryIfNeeded(); if (server.current_client == NULL) return C_ERR; if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR ) { flagTransaction(c); addReply(c, shared.oomerr); return C_OK; } } // 在持久化出错的情况下, master不接受写操作 if ( ( (server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) || server.aof_last_write_status == C_ERR ) && server.masterhost == NULL && (c->cmdd->flags & CMD_WRITE || c->cmd->proc == pingCommand) ) { flagTransaction(c); if (server.aof_last_write_status == C_OK) { addReply(c, shared.bgsaveerr); } else { addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno))); } return C_OK; } // 还有几种不接受写操作的场景, 这里代码省略掉, 这几种场景包括: // 1. 用户配置了 min_slaves-to_write配置项, 而符合要求的slave数量不达标 // 2. 当前是一个只读slave // ... // 在发布-订阅模式中, 仅支持 (P)SUBSCRIBE/(P)UNSUBSCRIBE/PING/QUIT 命令 if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand ) { addReplyError(c, "only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); return C_OK; } // 当用户配置slave-serve-stale-data为no, 且当前是一个slave, 且与master中断连接的情况下, 仅支持 INFO 和 SLAVEOF 命令 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_server_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) { flagTransaction(c); addReply(c, shared.masterdownerr); return C_OK; } // 如果当前正在进行loading操作, 则仅接受 LOADING 相关命令 if (server.loading && !(c->cmd->flags & CMD_LOADING)) { addReply(c, shared.loadingerr); return C_OK; } // 如果是lua脚本操作, 则支持受限数量的命令 // ... // 终于到了终点: 执行命令 // 对于事务中的命令: 进队列, 但不执行 // 对于事务结束标记EXEC: 调用call, 执行队列中的所有命令 // 对于非事务命令: 也是调用call, 就地执行 if ( c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { // 如果是事务, 则将命令先排进队列 queueMultiCommand(c); addReply(c, shared.queued); } else { // 如果是非事务执行, 则直接执行 call (c, CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) { handleClientsBlockedOnLists(); } } return C_OK; }

需要注意的点有:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsydf.html