下面再看看poll_runtime_pollOpen方法:
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) { pd := pollcache.alloc() lock(&pd.lock) if pd.wg != 0 && pd.wg != pdReady { throw("runtime: blocked write on free polldesc") } if pd.rg != 0 && pd.rg != pdReady { throw("runtime: blocked read on free polldesc") } pd.fd = fd pd.closing = false pd.everr = false pd.rseq++ pd.rg = 0 pd.rd = 0 pd.wseq++ pd.wg = 0 pd.wd = 0 pd.self = pd unlock(&pd.lock) var errno int32 errno = netpollopen(fd, pd) return pd, int(errno) } func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) }poll_runtime_pollOpen方法会通过pollcache.alloc初始化总大小约为 4KB的pollDesc结构体。然后重置pd的属性,调用netpollopen向epoll实例epfd加入新的轮询事件监听文件描述符的可读和可写状态。
下面我们再看看pollCache是如何初始化pollDesc的。
type pollCache struct { lock mutex first *pollDesc } const pollBlockSize = 4 * 1024 func (c *pollCache) alloc() *pollDesc { lock(&c.lock) // 初始化首节点 if c.first == nil { const pdSize = unsafe.Sizeof(pollDesc{}) n := pollBlockSize / pdSize if n == 0 { n = 1 } mem := persistentalloc(n*pdSize, 0, &memstats.other_sys) // 初始化pollDesc链表 for i := uintptr(0); i < n; i++ { pd := (*pollDesc)(add(mem, i*pdSize)) pd.link = c.first c.first = pd } } pd := c.first c.first = pd.link lockInit(&pd.lock, lockRankPollDesc) unlock(&c.lock) return pd }pollCache的链表头如果为空,那么初始化首节点,首节点是一个pollDesc的链表头,每次调用该结构体都会返回链表头还没有被使用的pollDesc。
到这里就完成了net.Listen的分析,下面我们看看listen.Accept。
Listener.AcceptListener.Accept方法最终会调用到netFD的accept方法中:
func (fd *netFD) accept() (netfd *netFD, err error) { // 调用netfd.FD的Accept接受新的 socket 连接,返回 socket 的 fd d, rsa, errcall, err := fd.pfd.Accept() ... // 构造一个新的netfd if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil { poll.CloseFunc(d) return nil, err } // 调用 netFD 的 init 方法完成初始化 if err = netfd.init(); err != nil { netfd.Close() return nil, err } lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd) netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) return netfd, nil }这个方法首先会调用到FD的Accept接受新的 socket 连接,并返回新的socket对应的fd,然后调用newFD构造一个新的netfd,并通过init 方法完成初始化。
init方法上面我们已经看过了,下面我们来看看Accept方法:
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { ... for { // 使用 linux 系统调用 accept 接收新连接,创建对应的 socket s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { case syscall.EINTR: continue case syscall.EAGAIN: if fd.pd.pollable() { // 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里 if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: continue } return -1, nil, errcall, err } }FD.Accept方法会使用 linux 系统调用 accept 接收新连接,创建对应的 socket,如果没有可读的消息,waitRead会被阻塞。这些被park住的goroutine会在goroutine的调度中调用runtime.netpoll被唤醒。
pollWait事件等待pollDesc.waitRead实际上是调用了runtime.poll_runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int { ... // 进入 netpollblock 并且判断是否有期待的 I/O 事件发生 for !netpollblock(pd, int32(mode), false) { ... } return 0 } func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } // 这个 for 循环是为了等待 io ready 或者 io wait for { old := *gpp // gpp == pdReady 表示此时已有期待的 I/O 事件发生, // 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作 if old == pdReady { *gpp = 0 return true } if old != 0 { throw("runtime: double wait") } // 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环 if atomic.Casuintptr(gpp, 0, pdWait) { break } } if waitio || netpollcheckerr(pd, mode) == 0 { // 让出当前线程,将 Goroutine 转换到休眠状态并等待运行时的唤醒 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5) } // be careful to not lose concurrent pdReady notification old := atomic.Xchguintptr(gpp, 0) if old > pdWait { throw("runtime: corrupted polldesc") } return old == pdReady }poll_runtime_pollWait会用for循环调用netpollblock函数判断是否有期待的 I/O 事件发生,直到netpollblock返回true表示io ready才会走出循环。