因为我的描述比较抽象,这里拿一段 man select中的说明给大家看下:
select() allow a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O operation (e.g., input possible). A file descriptor is considered ready if it is possible to perform the corresponding I/O operation (e.g., read(2)) without blocking.直译一下:select() 允许一个程序去监听多个文件描述符,等待直到1个或多个文件描述符变成 ready状态,该状态下,可以不阻塞地读写该文件描述符。
3处,事件事件,主要用来周期执行,执行一些redis的后台任务,如删除过期key,后面细讲。
4处,指向当前正在使用的多路复用库的相关数据,目前redis支持:select、epoll、kqueue、evport
5处,在处理事件前,要执行的一个函数
再回头来看前面的代码:
// 1 aeSetBeforeSleepProc(server.el, beforeSleep); aeMain(server.el);这里的1处,就是设置前面第5点提到的,设置处理事件前,先要执行的一个函数。
事件循环处理器的主循环 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 如果有需要在事件处理前执行的函数,那么运行它 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 开始处理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }可以看到,一共2个部分,首先执行eventLoop的事件处理前要执行的函数;接着再开始处理事件。
事件处理前的前置执行函数这里讲解下面这一句:
eventLoop->beforesleep(eventLoop);这个函数,在前面已经看到了,被赋值为:
aeSetBeforeSleepProc(server.el, beforeSleep);这个 beforeSleep如下:
void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ // 1 执行一次快速的主动过期检查 if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); // 2 ... /* Write the AOF buffer on disk */ // 3 将 AOF 缓冲区的内容写入到 AOF 文件 flushAppendOnlyFile(0); /* Call the Redis Cluster before sleep function. */ // 在进入下个事件循环前,执行一些集群收尾工作 if (server.cluster_enabled) clusterBeforeSleep(); }
1,这里会去执行主动的过期检查,大致流程代码如下:
void activeExpireCycle(int type) { /* This function has some global state in order to continue the work * incrementally across calls. */ // 静态变量,用来累积函数连续执行时的数据 static unsigned int current_db = 0; /* Last DB tested. */ ... unsigned int j, iteration = 0; // 默认每次处理的数据库数量 unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL; // 函数开始的时间 long long start = ustime(), timelimit; dbs_per_call = server.dbnum; timelimit = 1000000 * ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC / server.hz / 100; timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; // 1 遍历数据库 for (j = 0; j < dbs_per_call; j++) { int expired; // 指向要处理的数据库 redisDb *db = server.db + (current_db % server.dbnum); current_db++; do { unsigned long num, slots; long long now, ttl_sum; int ttl_samples; /* If there is nothing to expire try next DB ASAP. */ // 2 获取数据库中带过期时间的键的数量 如果该数量为 0 ,直接跳过这个数据库 if ((num = dictSize(db->expires)) == 0) { db->avg_ttl = 0; break; } // 3 获取数据库中键值对的数量 slots = dictSlots(db->expires); // 当前时间 now = mstime(); // 每次最多只能检查 LOOKUPS_PER_LOOP 个键 if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; // 4 开始遍历数据库 while (num--) { dictEntry *de; long long ttl; // 从 expires 中随机取出一个带过期时间的键 if ((de = dictGetRandomKey(db->expires)) == NULL) break; // 计算 TTL ttl = dictGetSignedIntegerVal(de) - now; // 5 如果键已经过期,那么删除它,并将 expired 计数器增一 if (activeExpireCycleTryExpire(db, de, now)) expired++; } // 6 为这个数据库更新平均 TTL 统计数据 ... // 更新遍历次数 iteration++; // 7 每遍历 16 次执行一次 if ((iteration & 0xf) == 0 && /* check once every 16 iterations. */ (ustime() - start) > timelimit) { // 如果遍历次数正好是 16 的倍数 // 并且遍历的时间超过了 timelimit // 那么断开 timelimit_exit timelimit_exit = 1; } // 8 已经超时了,返回 if (timelimit_exit) return; /* We don't repeat the cycle if there are less than 25% of keys * found expired in the current DB. */ // 如果已删除的过期键占当前总数据库带过期时间的键数量的 25 % // 那么不再遍历 } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP / 4); } }这个函数,删减了一部分,留下了主流程:
1处,遍历数据库,一般就是遍历16个库
2处,获取当前库中,过期键的数量,过期键都存储在db->expires中,只需要算这个map的size即可;如果没有要过期的,处理下一个库
3处,获取过期键的数量