但这样会有一个问题: 当在processMultiBulkBuffer中, 将超大参数以string对象的形式放在client->argv中时, 创建这个string对象, 底层是拷贝操作:
int processMultibulkBuffer(client * c) { // .... c->argv[c->argc++] = createStringObject(c->querybuf+pos, c->bulklen); // .... }对于超大参数来说, 这种string对象的创建过程, 涉及到的内存分配与数据拷贝开销是很昂贵的. 为了避免这种数据拷贝, Redis把对于超大参数的处理优化成了如下:
7.2 对二进制数据进行协议解析从传输层获取到数据之后, 下一步就是传递给processInputBuffer函数, 来进行协议解析, 如下:
void processInputBuffer(client * c) { server.current_client = c; // 每次循环, 解析并执行出一个命令字(及其参数), 并执行相应的命令 // 直到解析失败, 或缓冲区数据耗尽为止 while(sdslen(c->querybuf)) { // 在某些场合下, 不做处理 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; if (c->flags & CLIENT_BLOCKED) break; if (c->flag & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; // 如果client的reqtype字段中没有标明该客户端传递给服务端的请求类型(是内联命令, 还是协议数据) // 那么分析缓冲区中的第一个字节来决定. 这一般发生在客户端向服务端发送第一个请求的时候 if (!c->reqtype) { if (c->querybuf[0] == '*') c->reqtype = PROTO_REQ_MULTIBULK; else c->reqtype = PROTO_REQ_INLINE; } // 按类型解析出一个命令, 及其参数 if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknow request type") } // 执行命令逻辑 if (c->argc == 0) { // 当argc的值为0时, 代表本次解析没有解析出任何东西, 数据已耗尽 resetClient(c); } else { // 执行逻辑 if(processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { c->reploff = c->read_reploff - sdslen(c->querybuf); } if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) { resetClient(c); } } if (server.current_client == NULL) break; } } server.current_client = NULL; }这一步做了分流处理, 如果是内联命令, 则通过processInlineBuffer进行协议解析, 如果不是内联命令, 则通过processMultibulkBuffer进行协议解析. 协议解析完成后, 客户端上载的数据就被转换成了command, 然后接下来调用processCommand来执行服务端逻辑.
我们略过内联命令的解析, 直接看非内联命令请求的协议解析过程
int processMultibulkBuffer(client * c) { //... // 解析一个请求的头部时, 即刚开始解析一个请求时, multibulklen代表着当前命令的命令+参数个数, 该字段初始值为0 // 以下条件分支将读取一个命令的参数个数, 比如 '*1\r\n$4\r\nPING\r\n' // 以下分支完成后, 将有: // c->multibulklen = 1 // c->argv = zmalloc(sizeof(robj*)) // pos = <指向$> if (c->multibulklen == 0) { // ... // 读取当前命令的参数个数, 即赋值c->multibulklen newline = strchr(c->querybuf, '\r'); if (newline == NULL) { // 请求体不满足协议格式 // ... return C_ERR: } // ... ok = string2ll(c->querybuf+1, newline-(c->querybuf+1), &ll) if (!ok || ll > 1024 * 1024) { // 解析数值出错, 或数值大于1MB return C_ERR; } // 现在pos指向的是命令字 pos = (newline-c->querybuf) + 2 if (ll <= 0) { // 解出来一个空命令字, 什么也不做, 把缓冲区头部移除掉, 并且返回成功 sdsrange(c->querybuf, pos, -1); return C_OK; } c->multibulklen = ll; // 为命令字及其参数准备存储空间: 将它们都存储在c->argv中, 且是以redisObject的形式存储着 if (c->argv) zfree(c->argv); c->argv = zmalloc(sizeof(robj*)*c->multibulklen); } // ... // 读取解析命令字本身与其所有参数 while(c->multibulklen) { // 读取当前长字符串的长度 if (c->bulklen == -1) { // ... // 如果当前指针指向的参数, 是一个超大参数, 则把接收缓冲区修剪为该参数 pos += newline-(c->querybuf+pos) + 2; // 现在pos指向的是参数, 或命令字的起始位置 if (ll >= PROTO_MBULK_BIG_ARG) { size_t qblen; sdsrange(c->querybuf, pos, -1); // 将接收缓冲区修剪为该参数 pos = 0; qblen = sdslen(c->querybuf); if (qblen < (size_t)ll + 2) { // 如果这个超大参数还未接收完毕, 则扩充接收缓冲区, 为其预留空间 c->querybuf = sdsMakeRoomFor(c->querybuf, ll+2-qblen); } } c->bulklen == ll; } // 读取长字符串内容, 即是命令字本身, 或某个参数 if (sdslen(c->querybuf) - pos < (size_t)(c->bulklen+2)) { // 当前缓冲区中, 该长字符串的内容尚未接收完毕, 跳出解析 break; } else { if (pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) { // 对于超大参数, 为了避免数据复制的开销, 直接以接收缓冲区为底料, 创建一个字符串对象 // 而对于接收缓冲区, 为其重新分配内存 // 注意, 如果当前指针指向的是一个超大参数, 接收机制保证了, 这个超大参数之后, 缓冲区没有其它数据 c->argv[c->argc++] = createObject(OBJ_STRING, c->querybuf) sdsIncrLen(c->querybuf, -2); // 移除掉超大参数末尾的\r\n c->querybuf = sdsnewlen(NULL, c->bulklen + 2); // 为接收缓冲区重新分配内存 sdsclear(c->querybuf); pos = 0; } else { // 对于普通参数或命令字, 以拷贝形式新建字符串对象 c->argv[c->argc++] = createStringObject(c->querybuf+pos, c->bulklen); pos += c->bulklen+2; // pos指向下一个参数 } c->bulklen = -1; c->multibulklen--; } } // 至此, 一个命令, 及其所有参数, 均被存储在 c->argv中, 以redisObject形式存储着, 对象的类型是string // 扔掉所有已经解析过的二进制数据 if (pos) sdsrange(c->querybuf, pos, -1); if (c->multibulklen == 0) return C_OK return C_ERR; } 7.3 命令的执行