void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream, uv_handle_type type) { int err; uv__handle_init(loop, (uv_handle_t*)stream, type); stream->read_cb = NULL; stream->alloc_cb = NULL; stream->close_cb = NULL; stream->connection_cb = NULL; stream->connect_req = NULL; stream->shutdown_req = NULL; stream->accepted_fd = -1; stream->queued_fds = NULL; stream->delayed_error = 0; QUEUE_INIT(&stream->write_queue); QUEUE_INIT(&stream->write_completed_queue); stream->write_queue_size = 0; if (loop->emfile_fd == -1) { err = uv__open_cloexec("/dev/null", O_RDONLY); if (err < 0) /* In the rare case that "/dev/null" isn't mounted open "https://www.jb51.net/" * instead. */ err = uv__open_cloexec("https://www.jb51.net/", O_RDONLY); if (err >= 0) loop->emfile_fd = err; } #if defined(__APPLE__) stream->select = NULL; #endif /* defined(__APPLE_) */ // 初始化io观察者 uv__io_init(&stream->io_watcher, uv__stream_io, -1); } void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) { assert(cb != NULL); assert(fd >= -1); // 初始化队列,回调,需要监听的fd QUEUE_INIT(&w->pending_queue); QUEUE_INIT(&w->watcher_queue); w->cb = cb; w->fd = fd; w->events = 0; w->pevents = 0; #if defined(UV_HAVE_KQUEUE) w->rcount = 0; w->wcount = 0; #endif /* defined(UV_HAVE_KQUEUE) */ }
从代码可以知道,只是对uv_tcp_t结构体做了一些初始化操作。到这,new TCP的逻辑就执行完毕了。接下来就是继续分类nodejs里调用bind和listen的逻辑。nodejs的bind对应libuv的函数是uv__tcp_bind,listen对应的是uv_tcp_listen。
先看一个bind的核心代码。
/* Cannot set IPv6-only mode on non-IPv6 socket. */ if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6) return UV_EINVAL; // 获取一个socket并且设置某些标记 err = maybe_new_socket(tcp, addr->sa_family, 0); if (err) return err; on = 1; // 设置在端口可重用 if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) return UV__ERR(errno); bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) { struct sockaddr_storage saddr; socklen_t slen; if (domain == AF_UNSPEC) { handle->flags |= flags; return 0; } return new_socket(handle, domain, flags); } static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) { struct sockaddr_storage saddr; socklen_t slen; int sockfd; int err; // 获取一个socket err = uv__socket(domain, SOCK_STREAM, 0); if (err < 0) return err; sockfd = err; // 设置选项和保存socket的文件描述符到io观察者中 err = uv__stream_open((uv_stream_t*) handle, sockfd, flags); if (err) { uv__close(sockfd); return err; } ... return 0; } int uv__stream_open(uv_stream_t* stream, int fd, int flags) { if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) return UV_EBUSY; assert(fd >= 0); stream->flags |= flags; if (stream->type == UV_TCP) { if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) return UV__ERR(errno); /* TODO Use delay the user passed in. */ if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60)) { return UV__ERR(errno); } } ... // 保存socket对应的文件描述符到io观察者中,libuv会在io poll阶段监听该文件描述符 stream->io_watcher.fd = fd; return 0; }
上面的一系列操作主要是新建一个socket文件描述符,设置一些flag,然后把文件描述符保存到IO观察者中,libuv在poll IO阶段会监听该文件描述符,如果有事件到来,会执行设置的回调函数,该函数是在uv__stream_init里设置的uv__stream_io。最后执行bind函数进行绑定操作。最后我们来分析一下listen函数。首先看下tcp_wrapper.cc的代码。
void TCPWrap::Listen(const FunctionCallbackInfo<Value>& args) { TCPWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder(), args.GetReturnValue().Set(UV_EBADF)); int backlog = args[0]->Int32Value(); int err = uv_listen(reinterpret_cast<uv_stream_t*>(&wrap->handle_), backlog, OnConnection); args.GetReturnValue().Set(err); }