Redis 命令执行过程(上)

今天我们来了解一下 Redis 命令执行的过程。在之前的文章中《当 Redis 发生高延迟时,到底发生了什么》我们曾简单的描述了一条命令执行过程,本篇文章展示深入说明一下,加深读者对 Redis 的了解。

如下图所示,一条命令执行完成并且返回数据一共涉及三部分,第一步是建立连接阶段,响应了socket的建立,并且创建了client对象;第二步是处理阶段,从socket读取数据到输入缓冲区,然后解析并获得命令,执行命令并将返回值存储到输出缓冲区中;第三步是数据返回阶段,将返回值从输出缓冲区写到socket中,返回给客户端,最后关闭client。

Redis 命令执行过程(上)

这三个阶段之间是通过事件机制串联了,在 Redis 启动阶段首先要注册socket连接建立事件处理器:

当客户端发来建立socket的连接的请求时,对应的处理器方法会被执行,建立连接阶段的相关处理就会进行,然后注册socket读取事件处理器

当客户端发来命令时,读取事件处理器方法会被执行,对应处理阶段的相关逻辑都会被执行,然后注册socket写事件处理器

当写事件处理器被执行时,就是将返回值写回到socket中。

Redis 命令执行过程(上)

接下来,我们分别来看一下各个步骤的具体原理和代码实现。

启动时监听socket

Redis 服务器启动时,会调用 initServer 方法,首先会建立 Redis 自己的事件机制 eventLoop,然后在其上注册周期时间事件处理器,最后在所监听的 socket 上
创建文件事件处理器,监听 socket 建立连接的事件,其处理函数为 acceptTcpHandler。

void initServer(void) { // server.c .... /** * 创建eventLoop */ server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); /* Open the TCP listening socket for the user commands. */ if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); /** * 注册周期时间事件,处理后台操作,比如说客户端操作、过期键等 */ if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } /** * 为所有监听的socket创建文件事件,监听可读事件;事件处理函数为acceptTcpHandler * */ for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } .... }

在《Redis 事件机制详解》一文中,我们曾详细介绍过 Redis 的事件机制,可以说,Redis 命令执行过程中都是由事件机制协调管理的,也就是 initServer 方法中生成的 aeEventLoop。当socket发生对应的事件时,aeEventLoop 对调用已经注册的对应的事件处理器。

Redis 命令执行过程(上)

建立连接和Client

当客户端向 Redis 建立 socket时,aeEventLoop 会调用 acceptTcpHandler 处理函数,服务器会为每个链接创建一个 Client 对象,并创建相应文件事件来监听socket的可读事件,并指定事件处理函数。

acceptTcpHandler 函数会首先调用 anetTcpAccept方法,它底层会调用 socket 的 accept 方法,也就是接受客户端来的建立连接请求,然后调用 acceptCommonHandler方法,继续后续的逻辑处理。

// 当客户端建立链接时进行的eventloop处理函数 networking.c void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { .... // 层层调用,最后在anet.c 中 anetGenericAccept 方法中调用 socket 的 accept 方法 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); /** * 进行socket 建立连接后的处理 */ acceptCommonHandler(cfd,0,cip); }

acceptCommonHandler 则首先调用 createClient 创建 client,接着判断当前 client 的数量是否超出了配置的 maxclients,如果超过,则给客户端发送错误信息,并且释放 client。

static void acceptCommonHandler(int fd, int flags, char *ip) { //networking.c client *c; // 创建redisClient c = createClient(fd) // 当 maxClient 属性被设置,并且client数量已经超出时,给client发送error,然后释放连接 if (listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; if (write(c->fd,err,strlen(err)) == -1) { } server.stat_rejected_conn++; freeClient(c); return; } .... // 处理为设置密码时默认保护状态的客户端连接 // 统计连接数 server.stat_numconnections++; c->flags |= flags; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywdwf.html