4处,开始遍历当前数据库的过期键,最多遍历20次,这里的num,被ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP赋值,这个值定义为20,也就是说,每次扫描一个库中,20个过期键
5处,如果键已过期,则将这个key过期掉,比如从当前数据库删除,发布事件等等
6处,计算一些统计数据
7处,遍历16次,检查下是否已经执行了足够长的时间;因为redis是单线程的,不能一直执行过期键清理任务,还要处理客户端请求呢,所以,这里每执行16次循环,就检查下时间,看看是否已经超时,超时直接返回。
8处,超时返回
讲完了主动过期,接着讲前面的流程,2处,涉及一些主从复制相关的东西,这块放到后面吧
3处,将 aof 从缓存中,刷到磁盘
这个方法比较长,在后面分段讲解
刷新aof缓存到磁盘的执行过程判断是否有正在进行中的任务
void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; // 缓冲区中没有任何内容,直接返回 if (sdslen(server.aof_buf) == 0) return; // 策略为每秒 FSYNC if (server.aof_fsync == AOF_FSYNC_EVERYSEC) //1 是否有 SYNC 正在后台进行? sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;1处,会去判断一个全局变量,该变量是一个队列,用于存储后台任务。另外一个后台线程(没错,redis不是单纯的单线程,还是有其他线程的),会去该队列取任务,取不到就阻塞;取到了则执行。而刷新 aof 到磁盘这种重io的工作,就是被封装为一个任务,丢到这个队列中的。所以,这里去判断队列的大小是否为0.
/* Return the number of pending jobs of the specified type. * * 返回等待中的 type 类型的工作的数量 */ unsigned long long bioPendingJobsOfType(int type) { unsigned long long val; pthread_mutex_lock(&bio_mutex[type]); // 1 val = bio_pending[type]; pthread_mutex_unlock(&bio_mutex[type]); return val; }1处这里的val,就是存储指定类型的任务的数量。我们这里传入的type为 REDIS_BIO_AOF_FSYNC,所以就是看看:aof 刷盘的任务数量。
调用write函数执行写入
// 1 nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); if (nwritten != (signed)sdslen(server.aof_buf)) { // 2 ... }else{ // 3 /* Successful write(2). If AOF was in error state, restore the * OK state and log the event. */ // 写入成功,更新最后写入状态 if (server.aof_last_write_status == REDIS_ERR) { redisLog(REDIS_WARNING, "AOF write error looks solved, Redis can write again."); server.aof_last_write_status = REDIS_OK; } }
1处,执行写入,将server.aof_buf这个缓冲区的内容,写入aof文件,写入的字节长度为sdslen(server.aof_buf)。也就是,将整个缓冲区写入。
2处,如果写入的长度,不等于缓冲区的长度,表示只写了一部分,进入异常分支
为什么写入的会比预期的少,我们看看官方说明:
write() writes up to count bytes from the buffer pointed buf to the file referred to by the file descriptor fd. The number of bytes written may be less than count if, for example, there is insufficient space on the underlying physical medium, or the RLIMIT_FSIZE resource limit is encountered (see setrlimit(2)), or the call was interrupted by a signal handler after having written less than count bytes. (See also pipe(7).)这里的第二段就说了,可能是因为底层物理介质的空间不够;进程的资源限制;或者被中断。
3处,写入成功;更新状态,如果上一次aof写入状态为error,这次改为ok
flush到磁盘
前面write是写入到操作系统的os cache中,但是还没有落盘。必须执行flush之后,才会刷盘。
// 总是执行 fsnyc if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ // 1 aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ // 更新最后一次执行 fsnyc 的时间 server.aof_last_fsync = server.unixtime; // 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒 } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { // 2 放到后台执行 if (!sync_in_progress) aof_background_fsync(server.aof_fd); // 更新最后一次执行 fsync 的时间 server.aof_last_fsync = server.unixtime; }
1处,如果aof策略为:AOF_FSYNC_ALWAYS,则调用fsync,刷盘