假设这里的底层实现,就是前面讲的select函数,那么,select函数执行完后,eventLoop->fired 属性,就会存放这次select筛选出来的那些,ready的文件描述符集合。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, j, numevents = 0; /** * 拷贝到带_的变量中 */ memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); // 1 retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { for (j = 0; j <= eventLoop->maxfd; j++) { int mask = 0; aeFileEvent *fe = &eventLoop->events[j]; if (fe->mask == AE_NONE) continue; if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; // 2 eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; }如上所示,1处,调用select;2处,赋值给fired。
2处,从fired中取出对应的文件描述符
3处,如果fired中的文件描述符,可读,则执行对应的函数指针rfileProc指向的函数
4处,如果fired中的文件描述符,可写,则执行对应的函数指针wfileProc指向的函数
如果有周期任务,则执行周期任务 /* Check time events */ // 执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);这里会调用processTimeEvents,其实现如下,其中涉及到复杂的时间计算,我们可以只看核心流程:
/* Process time events * * 处理所有已到达的时间事件 */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; time_t now = time(NULL); // 更新最后一次处理时间事件的时间 eventLoop->lastTime = now; // 遍历链表 // 执行那些已经到达的事件 te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId - 1; while (te) { long now_sec, now_ms; long long id; // 获取当前时间 aeGetTime(&now_sec, &now_ms); // 如果当前时间等于或等于事件的执行时间,那么说明事件已到达,执行这个事件 if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; //1 执行事件处理器,并获取返回值 retval = te->timeProc(eventLoop, id, te->clientData); processed++; // 记录是否有需要循环执行这个事件时间 if (retval != AE_NOMORE) { // 2 是的, retval 毫秒之后继续执行这个时间事件 aeAddMillisecondsToNow(retval, &te->when_sec, &te->when_ms); } else { // 不,将这个事件删除 aeDeleteTimeEvent(eventLoop, id); } // 因为执行事件之后,事件列表可能已经被改变了 // 因此需要将 te 放回表头,继续开始执行事件 te = eventLoop->timeEventHead; } else { te = te->next; } } return processed; }
1处,执行timeProc这个函数指针,执行的函数,在初始化的时候,这个指针,被赋值为serverCron;
初始化时,会调用一下代码:
// 为 serverCron() 创建时间事件 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("Can't create the serverCron time event."); exit(1); }这里的serverCron,是一个函数指针。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { // 更新时间计数器 long long id = eventLoop->timeEventNextId++; // 创建时间事件结构 aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; // 设置 ID te->id = id; // 设定处理事件的时间 aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms); // 1 设置事件处理器 te->timeProc = proc; te->finalizerProc = finalizerProc; // 设置私有数据 te->clientData = clientData; // 将新事件放入表头 te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; return id; }上面的1处,将传入的serverCron,赋值给了te->timeProc。
2处,注册下一次的周期任务
总结本讲主要讲解了主循环的最外层结构,如果有什么不清楚的,可以留言。