开始之前咱们通过非阻塞IO引入一下:(来个简单例子socket.setblocking(False))
import time import socket def select(socket_addr_list): for client_socket, client_addr in socket_addr_list: try: data = client_socket.recv(2048) if data: print(f"[来自{client_addr}的消息:]\n") print(data.decode("utf-8")) client_socket.send( b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>" ) else: # 没有消息是触发异常,空消息是断开连接 client_socket.close() # 关闭客户端连接 socket_addr_list.remove((client_socket, client_addr)) print(f"[客户端{client_addr}已断开连接,当前连接数:{len(socket_addr_list)}]") except Exception: pass def main(): # 存放客户端集合 socket_addr_list = list() with socket.socket() as tcp_server: # 防止端口绑定的设置 tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(('', 8080)) tcp_server.listen() tcp_server.setblocking(False) # 服务端非阻塞 while True: try: client_socket, client_addr = tcp_server.accept() client_socket.setblocking(False) # 客户端非阻塞 socket_addr_list.append((client_socket, client_addr)) except Exception: pass else: print(f"[来自{client_addr}的连接,当前连接数:{len(socket_addr_list)}]") # 防止客户端断开后出错 if socket_addr_list: # 轮询查看客户端有没有消息 select(socket_addr_list) # 引用传参 time.sleep(0.01) if __name__ == "__main__": main()输出:
可以思考下:
为什么Server也要设置为非阻塞?
PS:一个线程里面只能有一个死循环,现在程序需要两个死循环,so ==> 放一起咯
断开连接怎么判断?
PS:没有消息是触发异常,空消息是断开连接
client_socket为什么不用dict存放?
PS:dict在循环的过程中,del会引发异常
1.Selectselect和上面的有点类似,就是轮询的过程交给了操作系统:
kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程
来个和上面等同的案例:
import select import socket def main(): with socket.socket() as tcp_server: tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(('', 8080)) tcp_server.listen() socket_info_dict = dict() socket_list = [tcp_server] # 监测列表 while True: # 劣势:select列表数量有限制 read_list, write_list, error_list = select.select( socket_list, [], []) for item in read_list: # 服务端迎接新的连接 if item == tcp_server: client_socket, client_address = item.accept() socket_list.append(client_socket) socket_info_dict[client_socket] = client_address print(f"[{client_address}已连接,当前连接数:{len(socket_list)-1}]") # 客户端发来 else: data = item.recv(2048) if data: print(data.decode("utf-8")) item.send( b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>" ) else: item.close() socket_list.remove(item) info = socket_info_dict[item] print(f"[{info}已断开,当前连接数:{len(socket_list)-1}]") if __name__ == "__main__": main()输出和上面一样
扩展说明:
select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪函数返回(有数据可读、可写、或者有except)或者超时(timeout指定等待时间,如果立即返回设为null即可)
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024(64位=>2048)
然后Poll就出现了,就是把上限给去掉了,本质并没变,还是使用的轮询
2.EPollepoll在内核2.6中提出(Linux独有),使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,采用监听回调的机制,这样在用户空间和内核空间的copy只需一次,避免再次遍历就绪的文件描述符列表
先来看个案例吧:(输出和上面一样)
import socket import select def main(): with socket.socket() as tcp_server: tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(('', 8080)) tcp_server.listen() # epoll是linux独有的 epoll = select.epoll() # tcp_server注册到epoll中 epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET) # key-value fd_socket_dict = dict() # 回调需要自己处理 while True: # 返回可读写的socket fd 集合 poll_list = epoll.poll() for fd, event in poll_list: # 服务器的socket if fd == tcp_server.fileno(): client_socket, client_addr = tcp_server.accept() fd = client_socket.fileno() fd_socket_dict[fd] = (client_socket, client_addr) # 把客户端注册进epoll中 epoll.register(fd, select.EPOLLIN | select.EPOLLET) else: # 客户端 client_socket, client_addr = fd_socket_dict[fd] data = client_socket.recv(2048) print( f"[来自{client_addr}的消息,当前连接数:{len(fd_socket_dict)}]\n") if data: print(data.decode("utf-8")) client_socket.send( b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>" ) else: del fd_socket_dict[fd] print( f"[{client_addr}已离线,当前连接数:{len(fd_socket_dict)}]\n" ) # 从epoll中注销 epoll.unregister(fd) client_socket.close() if __name__ == "__main__": main()扩展:epoll的两种工作模式