Redis 命令执行过程(上) (2)

createClient 方法用于创建 client,它代表着连接到 Redis 客户端,每个客户端都有各自的输入缓冲区和输出缓冲区,输入缓冲区存储客户端通过 socket 发送过来的数据,输出缓冲区则存储着 Redis 对客户端的响应数据。client一共有三种类型,不同类型的对应缓冲区的大小都不同。

普通客户端是除了复制和订阅的客户端之外的所有连接

从客户端用于主从复制,主节点会为每个从节点单独建立一条连接用于命令复制

订阅客户端用于发布订阅功能

Redis 命令执行过程(上)

createClient 方法除了创建 client 结构体并设置其属性值外,还会对 socket进行配置并注册读事件处理器

设置 socket 为 非阻塞 socket、设置 NO_DELAY 和 SO_KEEPALIVE标志位来关闭 Nagle 算法并且启动 socket 存活检查机制。

设置读事件处理器,当客户端通过 socket 发送来数据后,Redis 会调用 readQueryFromClient 方法。

client *createClient(int fd) { client *c = zmalloc(sizeof(client)); // fd 为 -1,表示其他特殊情况创建的client,redis在进行比如lua脚本执行之类的情况下也会创建client if (fd != -1) { // 配置socket为非阻塞、NO_DELAY不开启Nagle算法和SO_KEEPALIVE anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); /** * 向 eventLoop 中注册了 readQueryFromClient。 * readQueryFromClient 的作用就是从client中读取客户端的查询缓冲区内容。 * 绑定读事件到事件 loop (开始接收命令请求) */ if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } // 默认选择数据库 selectDb(c,0); uint64_t client_id; atomicGetIncr(server.next_client_id,client_id,1); c->id = client_id; c->fd = fd; .... // 设置client的属性 return c; }

client 的属性中有很多属性,比如后边会看到的输入缓冲区 querybuf 和输出缓冲区 buf,这里因为代码过长做了省略,感兴趣的同学可以自行阅读源码。

读取socket数据到输入缓冲区

readQueryFromClient 方法会调用 read 方法从 socket 中读取数据到输入缓冲区中,然后判断其大小是否大于系统设置的 client_max_querybuf_len,如果大于,则向 Redis返回错误信息,并关闭 client。

将数据读取到输入缓冲区后,readQueryFromClient 方法会根据 client 的类型来做不同的处理,如果是普通类型,则直接调用 processInputBuffer 来处理;如果是主从客户端,还需要将命令同步到自己的从服务器中。也就是说,Redis实例将主实例传来的命令执行后,继续将命令同步给自己的从实例。

Redis 命令执行过程(上)

// 处理从client中读取客户端的输入缓冲区内容。 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; .... if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 从 fd 对应的socket中读取到 client 中的 querybuf 输入缓冲区 nread = read(fd, c->querybuf+qblen, readlen); if (nread == -1) { .... // 出错释放 client } else if (nread == 0) { // 客户端主动关闭 connection serverLog(LL_VERBOSE, "Client closed connection"); freeClient(c); return; } else if (c->flags & CLIENT_MASTER) { /* * 当这个client代表主从的master节点时,将query buffer和 pending_querybuf结合 * 用于主从复制中的命令传播???? */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } // 增加已经读取的字节数 sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; // 如果大于系统配置的最大客户端缓存区大小,也就是配置文件中的client-query-buffer-limit if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); // 返回错误信息,并且关闭client bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } if (!(c->flags & CLIENT_MASTER)) { // processInputBuffer 处理输入缓冲区 processInputBuffer(c); } else { // 如果client是master的连接 size_t prev_offset = c->reploff; processInputBuffer(c); // 判断是否同步偏移量发生变化,则通知到后续的slave size_t applied = c->reploff - prev_offset; if (applied) { replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied); sdsrange(c->pending_querybuf,applied,-1); } } } 解析获取命令

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywdwf.html