看看epollWait的native实现
// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) { struct epoll_event *events = jlong_to_ptr(address); int res; if (timeout <= 0) { /* Indefinite or no wait */ // epoll_wait参数的含义是 // epfd,创建的epoll句柄 // events是一个结构体指针,如果有IO事件发生,linux会将事件放在这个结构体中返回 // numfds是上面指针指向的结构体的个数,也就是最多能接收的IO事件的个数 // timeout是超时时间 RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res); } else { /* Bounded wait; bounded restarts */ res = iepoll(epfd, events, numfds, timeout); } if (res < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed"); } return res; }从native实现上可以看出最终调用了epoll_wait(在timeout >= 0),接着看看epoll_wait的里一个参数events的来源。在上一篇文章里面我们说了channel注册的时候会将自己需要监听的事件类型保存在sun.nio.ch.EPollArrayWrapper#eventsLow中,而上面EPollArrayWrapper#poll中又调用了updateSelectedKeys来注册每个socket监听的事件
// sun.nio.ch.EPollArrayWrapper#getUpdateEvents // 获取需要监听的文件描述符对应的事件 private byte getUpdateEvents(int fd) { // 如果没有超出预定义的数组大小则直接从数组中获取 if (fd < MAX_UPDATE_ARRAY_SIZE) { return eventsLow[fd]; } else { // 超出预订单数组大小的部分从map中获取 Byte result = eventsHigh.get(Integer.valueOf(fd)); // result should never be null return result.byteValue(); } } // sun.nio.ch.EPollArrayWrapper#updateRegistrations // 这个方法是在epoll_wait前把需要监听的文件描述符及其需要监听的事件注册到epoll上 private void updateRegistrations() { synchronized (updateLock) { int j = 0; // 每调用一次setInterest,updateCount加1 while (j < updateCount) { // 需要监听的文件描述符 int fd = updateDescriptors[j]; // 需要监听的事件,比如channel注册之后的事件是 short events = getUpdateEvents(fd); boolean isRegistered = registered.get(fd); int opcode = 0; if (events != KILLED) { if (isRegistered) { opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; } else { opcode = (events != 0) ? EPOLL_CTL_ADD : 0; } if (opcode != 0) { // 调用epollCtl来add、update或者delete对应文件描述符坚挺的事件 epollCtl(epfd, opcode, fd, events); if (opcode == EPOLL_CTL_ADD) { registered.set(fd); } else if (opcode == EPOLL_CTL_DEL) { registered.clear(fd); } } } j++; } updateCount = 0; } }上面epollCtl又是一个native方法
// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) { struct epoll_event event; int res; event.events = events; event.data.fd = fd; // opcode,EPOLL_CTL_ADD(注册新的fd到epfd), EPOLL_CTL_MOD(修改已经注册的fd的监听事件), EPOLL_CTL_DEL(从epfd删除一个fd); // fd,需要监听的socket对应的文件描述符 // event,该文件描述符监听的事件 RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res); // 省略中间代码... } 关于select过程中的中断说明 这里的中断是什么?这里中断并不是操作系统层面的中断,只是中断epoll_wait。由于epoll_wait可能会阻塞等待IO事件(timeout = -1),这里的中断就是指中断epoll_wait,即时返回。也就是让select即时返回
这里的中断是怎么实现的?由于epoll_wait处在等待的情况下的时候,如果有文件描述符上有事件发生,epoll_wait就会返回,所以基本思路就是在epoll监控的文件描述符上产生IO事件,具体实现原理就是使用管道创建两个文件描述符fd0,fd1(EPollSelectorImpl),fd0用来作为读描述符,fd1作为写描述符,然后将读描述符注册到epoll上,如果向fd1写内容,epoll发现fd0有IO事件就会返回,起到了让epoll_wait及时返回的作用。
什么时候会中断调用Thread.interrupt()
selector关闭的时候
可以直接调用sun.nio.ch.EPollSelectorImpl#wakeup,这是一个public方法
中断的方法是sun.nio.ch.EPollArrayWrapper#interrupt(),这个方法会调用一个native方法
// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd) { int fakebuf[1]; fakebuf[0] = 1; // 传入的文件描述符是sun.nio.ch.EPollArrayWrapper#outgoingInterruptFD,也就是创建的pipe的写文件描述符fd1,向pipe的fd1写入一个字节的1 if (write(fd, fakebuf, 1) < 0) { JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed"); } }这个时候epoll_wait就收到中断文件描述符sun.nio.ch.EPollArrayWrapper#incomingInterruptFD,也就是创建的pipe的读文件描述符上有IO事件产生,epoll_wait可以返回。
所以调用到方法EPollArrayWrapper#interrupt()就可以中断文件描述符,而方法EPollSelectorImpl#wakeup调用了EPollArrayWrapper#interrupt()。
那么为什么调用Thread.interrupt()的时候也会中断epoll_wait呢?