Redis中单机数据库的实现 (2)

对于文件事件来说, 除了可读AE_READABLE与可写AE_WRITABLE两种监听触发方式外, 还有一种额外的AE_BARRIER触发方式. 若监听文件事件时, 将AE_BARRIER与AE_WRITABLE组合时, 保证若当前文件如果正在处理可读(被可读触发), 就不再同时触发可写. 在一些特殊场景下这个特性是比较有用的.

事件处理器aeEventLoop中, 对于文件事件的处理, 走的都是老套路. 这里需要注意的是事件处理器中, 定时事件的设计:

所有定时事件, 像糖葫芦一样, 串成一个链表

定时事件不是严格定时的, 定时事件中不适宜执行耗时操作

事件处理器的核心接口有以下几个:

// 创建一个事件处理器实例 aeEventLoop *aeCreateEventLoop(int setsize); // 销毁一个事件处理器实例 void aeDeleteEventLoop(aeEventLoop *eventLoop); // 事件处理器急停 void aeStop(aeEventLoop *eventLoop); // 创建一个文件事件, 并添加进事件处理器中 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); // 从事件处理器中删除一个文件事件. void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); // 通过文件描述符, 获取事件处理器中, 该文件事件注册的触发方式(可读|可写|BARRIER) int aeGetFileEvents(aeEventLoop *eventLoop, int fd); // 创建一个定时事件, 并挂在事件处理器的时间事件链表中. 返回的是创建好的定时事件的编号 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc); // 删除事件处理器中的某个时间事件 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); // 调用多路IO接口, 处理所有被触发的文件事件, 与所有到期的定时事件 int aeProcessEvents(aeEventLoop *eventLoop, int flags); // tricky接口.内部调用poll接口, 等待milliseconds毫秒, 或直至指定fd上的指定事件被触发 int aeWait(int fd, int mask, long long milliseconds); // 事件处理器启动器 void aeMain(aeEventLoop *eventLoop); // 获取底层路IO接口的名称 char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep); int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);

核心代码如下:

首先是启动器, 启动器负责三件事:

响应急停

执行beforesleep回调, 如果设置了该回调的值, 那么在每次事件循环之前, 该函数会被执行

一遍一遍的轮大米, 调用aeProcessEvents, 即事件循环

void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }

然后是事件循环

int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; // 无事快速退朝 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { // 在 "事件处理器中至少有一个文件事件" 或 "以阻塞形式执行事件循环, 且处理定时事件" // 时, 进入该分支 // 这里有一点绕, 其实就是, 如果 // "事件处理器中没有监听任何文件事件", 且, // "执行事件循环时指明不执行定时事件, 或虽然指明了执行定时事件, 但是要求以非阻塞形式运行事件循环" // 都不会进入该分支, 不进入该分支时, 分两种情况: // 1. 没监听任何文件事件, 且不执行定时事件, 相当于直接返回了 // 2. 没监听任何文件事件, 执行定时事件, 但要求以非阻塞形式执行. 则立即执行所有到期的时间事件, 如果没有, 则相当于直接返回 // 第一步: 计算距离下一个到期的定时事件, 还有多长时间. 计该时间为tvp // 即便本次事件循环没有指明要处理定时事件, 也计算这个时间 int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; aeGetTime(&now_sec, &now_ms); tvp = &tv; /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 第二步: 以tvp为超时时间, 调用多路IO接口, 获取被触发的文件事件, 并处理文件事件 // 如果tvp为null, 即当前事件处理器中没有定时事件, 则调用aeApiPoll的超时时间是无限的 numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ // 通常情况下, 如果一个文件事件, 同时被可读与可写同时触发 // 都是先执行可读回调, 再执行可写回调 // 但如果事件掩码中带了AE_BARRIER, 就会扭转这个行为, 先执行可写回调, 再执行可读加高 int invert = fe->mask & AE_BARRIER; if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } // 按需执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ } 3. 网络库

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsydf.html