详解Go语言I/O多路复用netpoller模型 (2)

这个TCP server中会调用net.Listen创建一个socket同时返回与之对应的fd,该fd用来初始化listener的netFD(go层面封装的网络文件描述符),接着调用 netFD的listenStream方法完成对 socket 的 bind&listen和netFD的初始化。

调用过程如下:

listen

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) { // 创建一个socket s, err := sysSocket(family, sotype, proto) if err != nil { return nil, err } ... // 创建fd if fd, err = newFD(s, family, sotype, net); err != nil { poll.CloseFunc(s) return nil, err } if laddr != nil && raddr == nil { switch sotype { case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: // 调用 netFD的listenStream方法完成对 socket 的 bind&listen和netFD的初始化 if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil case syscall.SOCK_DGRAM: ... } } ... return fd, nil } func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) { ret := &netFD{ pfd: poll.FD{ Sysfd: sysfd, IsStream: sotype == syscall.SOCK_STREAM, ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW, }, family: family, sotype: sotype, net: net, } return ret, nil }

sysSocket方法会发起一个系统调用创建一个socket,newFD会创建一个netFD,然后调用netFD的listenStream方法进行bind&listen操作,并对netFD进行init。

netFD

netFD是一个文件描述符的封装,netFD中包含一个FD数据结构,FD中包含了Sysfd 和pollDesc两个重要的数据结构,Sysfd是sysSocket返回的socket系统文件描述符,pollDesc用于监控文件描述符的可读或者可写。

我们继续看listenStream:

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error { ... // 完成绑定操作 if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } // 进行监听操作 if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil { return os.NewSyscallError("listen", err) } // 初始化fd if err = fd.init(); err != nil { return err } lsa, _ = syscall.Getsockname(fd.pfd.Sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil }

listenStream方法会调用Bind方法完成fd的绑定操作,然后调用listenFunc进行监听,接着调用fd的init方法,完成FD、pollDesc初始化。

func (pd *pollDesc) init(fd *FD) error { // 调用到runtime.poll_runtime_pollServerInit serverInit.Do(runtime_pollServerInit) // 调用到runtime.poll_runtime_pollOpen ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) ... return nil }

runtime_pollServerInit用Once封装保证只能被调用一次,这个函数在Linux平台上会创建一个epoll文件描述符实例;

poll_runtime_pollOpen调用了netpollopen会将fd注册到 epoll实例中,并返回一个pollDesc;

netpollinit初始化 func poll_runtime_pollServerInit() { netpollGenericInit() } func netpollGenericInit() { if atomic.Load(&netpollInited) == 0 { lock(&netpollInitLock) if netpollInited == 0 { netpollinit() atomic.Store(&netpollInited, 1) } unlock(&netpollInitLock) } }

netpollGenericInit会调用平台上特定实现的netpollinit,在Linux中会调用到netpoll_epoll.go的netpollinit方法:

var ( epfd int32 = -1 // epoll descriptor ) func netpollinit() { // 创建一个新的 epoll 文件描述符 epfd = epollcreate1(_EPOLL_CLOEXEC) ... // 创建一个用于通信的管道 r, w, errno := nonblockingPipe() ... ev := epollevent{ events: _EPOLLIN, } *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd // 将读取数据的文件描述符加入监听 errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) ... netpollBreakRd = uintptr(r) netpollBreakWr = uintptr(w) }

调用epollcreate1方法会创建一个epoll文件描述符实例,需要注意的是epfd是一个全局的属性。然后创建一个用于通信的管道,调用epollctl将读取数据的文件描述符加入监听。

netpollopen加入事件监听

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsswds.html