Linux select I/O 复用

在处理多个socket套接字的时候,会很自然的遇到一个问题:某个套接字什么时候可读?什么时候可写?哪些套接字是需要关闭的?我们可以回忆一下,一般我们在最开始编写socket程序的时候,send,recv都是同步的,send完后就傻等着recv。这种模式的一个很大的问题是,recv会占用一整个线程,单个线程里没法处理第二个socket。怎么办呢?加线程,每个socket分配一个线程?显然不合适,1000个客户端难道要1000个线程么。select提供了一种方式同时监控多个套接字,执行过程大致为:首先将感兴趣的套接字加入到select的集合中,select函数执行,监控这些个套接字,当其中有套接字产生了事件(可读,可写,异常)或者select超时后,select返回告知调用者,哪些个套接字发送了事件,调用者就对发生了事件的套接字挨个处理,然后继续执行select函数,下个循环开始。
通过这样的一种方式,在同一个线程里面就实现了对多个套接字的读写操作。当然需要注意的是,select调用针对的是文件描述符,不管是socket,pipe,file都是可以被select监控的。

函数说明 #include <sys/time.h> #include <sys/types.h> #include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);

当前linux下selet的每个fd_set最多可以监控1024个文件描述符

参数

nfds 要监听的所有的文件描述符中最大值 + 1

readfds 要监听其读事件的文件描述符集合

writefds 要监听其写事件的文件描述符集合

exceptfds 要监听其异常事件的文件描述符集合(比如socket的带外数据就会引起exceptfds事件)

timeout 超时时间结构,指定为NULL表示一直阻塞,否则就阻塞指定的时间。

返回值

-1 出错

= 超时

>0 某些套接字产生事件

使用

假设对于一个文件描述符fd,我们想监控其读事件,就将加入到读监控集合中。

fd_set read_set; FD_ZERO(&read_set); FD_SET(fd, &read_set); select(maxFd +1, &read_set, NULL, NULL, NULL)

同理,对于写事件,异常事件也是一样的处理方式。

当select调用返回后,如果select返回>0 则可以对指定的文件描述符进行操作。

if (FD_ISSET(fd, &read_set)) { recv(fd) }

在程序设计中,一般将select结合while使用。

参考 实例

使用select构建的一个tcp echo程序,程序可以接受多个客户端的链接,并将客户端发送过来的字符串发送回去。

测试

测试环境:CentOS7.1

server端截图:

Linux select I/O 复用


cleint1截图:

Linux select I/O 复用


cleint2截图:

Linux select I/O 复用

代码 /* author: bymzy * 2017/2/12 * */ #include <sys/select.h> #include <sys/types.h> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <errno.h> #include <stdio.h> #include <strings.h> #include <stdlib.h> #define PORT 3333 #define IP "127.0.0.1" #define BACKLOG 5 #define BUFSIZE 1025 void SetFDSet(int *clientFd, int listenFd, fd_set *read, fd_set *except, int *maxFd) { int i = listenFd + 1; FD_ZERO(read); FD_ZERO(except); for (;i <= *maxFd; ++i) { if (clientFd[i] != 0) { FD_SET(clientFd[i], read); FD_SET(clientFd[i], except); } } } void CheckFDSet(int *clientFd, int listenFd, fd_set *read, fd_set *except, int *maxFd) { int i = listenFd + 1; int tempMaxFd = *maxFd; char buf[1024]; int recved = 0; bool needClose = false; bzero(buf, 1024); for (;i <= tempMaxFd; ++i) { bzero(buf, recved); if (FD_ISSET(clientFd[i], read)) { recved = recv(clientFd[i], buf, BUFSIZE -1 , 0); if (recved <= 0) { perror("Recv error"); close(clientFd[i]); if (i == tempMaxFd) { *maxFd = -1; } clientFd[i] = 0; continue; } printf("Recv: %s \n", buf); send(clientFd[i], buf, recved, 0); } bzero(buf, recved); if (FD_ISSET(clientFd[i], except)) { recved = recv(clientFd[i], buf, BUFSIZE - 1, MSG_OOB); if (recved <= 0) { perror("Recv error"); close(clientFd[i]); if (i == tempMaxFd) { *maxFd = -1; } clientFd[i] = 0; continue; } printf("OOB: %s \n", buf); send(clientFd[i], buf, recved, MSG_OOB); } } } void ChooseMaxFd(int *clientFd, int listenFd, int total, int *maxFd) { int i = listenFd; for (;i < total; ++i) { if(clientFd[i] != 0) { *maxFd = clientFd[i]; } } } void CloseFd(int *clientFd, int listenFd, int total) { int i = listenFd + 1; for (;i < total; ++i) { if(clientFd[i] != 0) { close(clientFd[i]); clientFd[i] = 0; } } } int main(int argc, char* argv[]) { int err = 0; int listenFd = -1; do { listenFd = socket(AF_INET, SOCK_STREAM, 0); if (listenFd < 0) { err = errno; perror("Create listen socket failed"); break; } struct sockaddr_in bindaddr; bzero(&bindaddr, 0); bindaddr.sin_addr.s_addr = inet_addr(IP); bindaddr.sin_port = htons(PORT); bindaddr.sin_family = AF_INET; socklen_t socklen = sizeof(bindaddr); int reuse = 1; setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); err = bind(listenFd, (sockaddr*)&bindaddr, socklen); if (err != 0) { err = errno; perror("Listen socket bind failed!"); break; } err = listen(listenFd, BACKLOG); if (err != 0) { err = errno; perror("Listen socket listen failed!"); break; } /* use select read data */ fd_set read_set; fd_set exception_set; struct timeval tv; tv.tv_sec = 5; tv.tv_usec = 0; int *clientFd = (int*)malloc(sizeof(int) * (FD_SETSIZE + listenFd + 1)); clientFd[listenFd] = listenFd; int maxFd = -1; while (1) { if (maxFd == -1) { ChooseMaxFd(clientFd, listenFd, FD_SETSIZE + listenFd + 1, &maxFd); } SetFDSet(clientFd, listenFd, &read_set, &exception_set, &maxFd); if (maxFd != (FD_SETSIZE + listenFd)) { FD_SET(listenFd, &read_set); } tv.tv_sec = 5; tv.tv_usec = 0; err = select(maxFd + 1, &read_set, NULL, &exception_set, &tv); if (err < 0) { perror("Select failed"); err = errno; break; } else if (err == 0) { printf("Select timeout!\n"); } else { if (FD_ISSET(listenFd, &read_set)) { struct sockaddr_in clientaddr; socklen_t clientlen; int tempFd= -1; tempFd = accept(listenFd, (sockaddr*)&clientaddr, &clientlen); if (tempFd < 0) { err = errno; perror("accept failed!"); break; } clientFd[tempFd] = tempFd; if (tempFd > maxFd) { maxFd = tempFd; } printf("Accept client fd: %d\n", tempFd); } CheckFDSet(clientFd, listenFd, &read_set, &exception_set, &maxFd); } } CloseFd(clientFd, listenFd, FD_SETSIZE + listenFd + 1); free(clientFd); } while(0); if (listenFd != -1) { close(listenFd); listenFd = -1; } return err; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/0963287a0e0379924b01ade72c9b0f81.html