详解Go语言I/O多路复用netpoller模型 (4)

netpollblock方法会判断当前的状态是不是处于pdReady,如果是那么直接返回true;如果不是,那么将gpp通过CAS设置为pdWait并退出 for 循环。通过gopark 把当前 goroutine 给 park 住,直到对应的 fd 上发生可读/可写或者其他I/O 事件为止。

这些被park住的goroutine会在goroutine的调度中调用runtime.netpoll被唤醒。

netpoll轮询等待

runtime.netpoll的核心逻辑是: 根据入参 delay设置调用 epoll_wait 的 timeout 值,调用 epoll_wait 从 epoll 的 eventpoll.rdllist双向列表中获取IO就绪的fd列表,遍历epoll_wait 返回的fd列表, 根据调用epoll_ctl注册fd时封装的上下文信息组装可运行的 goroutine 并返回。

执行完 netpoll 之后,会返回一个就绪 fd 列表对应的 goroutine 列表,接下来将就绪的 goroutine 加入到调度队列中,等待调度运行。

func netpoll(delay int64) gList { if epfd == -1 { return gList{} } var waitms int32 // 因为传入delay单位是纳秒,下面将纳秒转换成毫秒 if delay < 0 { waitms = -1 } else if delay == 0 { waitms = 0 } else if delay < 1e6 { waitms = 1 } else if delay < 1e15 { waitms = int32(delay / 1e6) } else { // An arbitrary cap on how long to wait for a timer. // 1e9 ms == ~11.5 days. waitms = 1e9 } var events [128]epollevent retry: // 等待文件描述符转换成可读或者可写 n := epollwait(epfd, &events[0], int32(len(events)), waitms) // 返回负值,那么重新调用epollwait进行等待 if n < 0 { ... goto retry } var toRun gList // 意味着被监控的文件描述符出现了待处理的事件 for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } ... // 判断发生的事件类型,读类型或者写类型 var mode int32 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { // 取出保存在 epollevent 里的 pollDesc pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) pd.everr = false if ev.events == _EPOLLERR { pd.everr = true } // 调用 netpollready,传入就绪 fd 的 pollDesc netpollready(&toRun, pd, mode) } } return toRun }

netpoll会调用epollwait获取就绪的 fd 列表,对应的epoll函数是epoll_wait。toRun是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方。如果epollwait返回的n大于零,那么表示被监控的文件描述符出现了待处理的事件,那么需要调用for循环进行处理。循环里面会根据时间类型设置mode,然后拿出对应的pollDesc,调用netpollready方法。

下面我们再看一下netpollready:

func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g // 获取对应的g的指针 if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } // 将对应的g加入到toRun列表中 if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) } } func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { gpp := &pd.rg // 根据传入的mode判断事件类型 if mode == 'w' { gpp = &pd.wg } for { // 取出 gpp 存储的 g old := *gpp if old == pdReady { return nil } if old == 0 && !ioready { return nil } var new uintptr if ioready { new = pdReady } // cas 将读或者写信号量转换成 pdReady if atomic.Casuintptr(gpp, old, new) { if old == pdWait { old = 0 } // 返回对应的 g指针 return (*g)(unsafe.Pointer(old)) } } }

讲完了runtime.netpoll的源码有个需要注意的地方,调用runtime.netpoll的地方有两处:

在调度器中执行runtime.schedule(),该方法中会执行runtime.findrunable(),在runtime.findrunable()中调用了runtime.netpoll获取待执行的goroutine;

Go runtime 在程序启动的时候会创建一个独立的sysmon监控线程,sysmon 每 20us~10ms 运行一次,每次运行会检查距离上一次执行netpoll是否超过10ms,如果是则会调用一次runtime.netpoll;

这些入口的调用感兴趣的可以自己去看看。

总结

本文从I/O多路复用开始讲解select以及epoll,然后再回到go语言中去看它是如何实现多路复用这样的结构的。通过追踪源码可以发现,其实go也是根据epoll来封装自己的函数:

func netpollinit() func netpollopen(fd uintptr, pd *pollDesc) int32 func netpoll(block bool) gList

通过这三个函数来实现对epoll的创建实例、注册、事件等待操作。

对于I/O多路复用不是很了解的同学也可以借此机会多多的去学习一下网络编程方面的知识,扩充一下知识面。

Reference

https://www.infoq.cn/article/boeavgkiqmvcj8qjnbxk

https://zhuanlan.zhihu.com/p/64138532

https://imageslr.github.io/2020/02/27/select-poll-epoll.html

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsswds.html