Python 3下基于select模型的事件驱动机制程序(2)

这种模型的特征在于每一个执行周期都会探测一次或一组事件,一个特定的事件会触发某个特定的响应。我们可以将这种模型归类为“事件驱动模型”。
    相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
    但这个模型依旧有着很多问题。首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
    其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。如下例,庞大的执行体1的将直接导致响应事件2的执行体迟迟得不到执行,并在很大程度上降低了事件探测的及时性。

Python 3下基于select模型的事件驱动机制程序

图10 庞大的执行体对使用select()的事件驱动模型的影响

幸运的是,有很多高效的事件驱动库可以屏蔽上述的困难,常见的事件驱动库有libevent库,还有作为libevent替代者的libev库。这些库会根据操作系统的特点选择最合适的事件探测接口,并且加入了信号(signal) 等技术以支持异步响应,这使得这些库成为构建事件驱动模型的不二选择。下章将介绍如何使用libev库替换select或epoll接口,实现高效稳定的服务器模型。

实际上,Linux内核从2.6开始,也引入了支持异步响应的IO操作,如aio_read, aio_write,这就是异步IO。

Python下则是将其封装了, 对返回值做了修改, 相比较原来在C下的返回值(一个整型, 判断是否调用成功), python下的调用返回值则是直接返回的可读, 可写, 异常状态序列。C中的可读, 可写, 异常状态的序列, 则是直接将其写入了参数里面, 也就是说输入输出参数都是一样的, python这样的封装设计还是很不错的。我设计了一个粗陋的基于事件机制的select调用:

服务器端:

import select import socket import queue from time import sleep class TCPServer: def __init__(self, server, server_address, inputs, outputs, message_queues): # Create a TCP/IP self.server = server self.server.setblocking(False) # Bind the socket to the port self.server_address = server_address print('starting up on %s port %s' % self.server_address) self.server.bind(self.server_address) # Listen for incoming connections self.server.listen(5) # Sockets from which we expect to read self.inputs = inputs # Sockets to which we expect to write # 处理要发送的消息 self.outputs = outputs # Outgoing message queues (socket: Queue) self.message_queues = message_queues def handler_recever(self, readable): # Handle inputs # 循环判断是否有客户端连接进来, 当有客户端连接进来时select 将触发 for s in readable: # 判断当前触发的是不是服务端对象, 当触发的对象是服务端对象时,说明有新客户端连接进来了 # 表示有新用户来连接 if s is self.server: # A "readable" socket is ready to accept a connection connection, client_address = s.accept() self.client_address = client_address print('connection from', client_address) # this is connection not server connection.setblocking(0) # 将客户端对象也加入到监听的列表中, 当客户端发送消息时 select 将触发 self.inputs.append(connection) # Give the connection a queue for data we want to send # 为连接的客户端单独创建一个消息队列,用来保存客户端发送的消息 self.message_queues[connection] = queue.Queue() else: # 有老用户发消息, 处理接受 # 由于客户端连接进来时服务端接收客户端连接请求,将客户端加入到了监听列表中(input_list), 客户端发送消息将触发 # 所以判断是否是客户端对象触发 data = s.recv(1024) # 客户端未断开 if data != b'': # A readable client socket has data print('received "%s" from %s' % (data, s.getpeername())) # 将收到的消息放入到相对应的socket客户端的消息队列中 self.message_queues[s].put(data) # Add output channel for response # 将需要进行回复操作socket放到output 列表中, 让select监听 if s not in self.outputs: self.outputs.append(s) else: # 客户端断开了连接, 将客户端的监听从input列表中移除 # Interpret empty result as closed connection print('closing ', s.getpeername()) # 获取客户端的socket信息 # Stop listening for input on the connection if s in self.outputs: self.outputs.remove(s) self.inputs.remove(s) s.close() # Remove message queue # 移除对应socket客户端对象的消息队列 del self.message_queues[s] return "got it" def handler_send(self, writable): # Handle outputs # 如果现在没有客户端请求, 也没有客户端发送消息时, 开始对发送消息列表进行处理, 是否需要发送消息 # 存储哪个客户端发送过消息 for s in writable: try: # 如果消息队列中有消息,从消息队列中获取要发送的消息 message_queue = self.message_queues.get(s) send_data = '' if message_queue is not None: send_data = message_queue.get_nowait() except queue.Empty: # 客户端连接断开了 self.outputs.remove(s) else: # print "sending %s to %s " % (send_data, s.getpeername) # print "send something" if message_queue is not None: s.send(send_data) else: print("client has closed") # del message_queues[s] # writable.remove(s) # print "Client %s disconnected" % (client_address) return "got it" def handler_exception(self, exceptional): # # Handle "exceptional conditions" # 处理异常的情况 for s in exceptional: print('exception condition on', s.getpeername()) # Stop listening for input on the connection self.inputs.remove(s) if s in self.outputs: self.outputs.remove(s) s.close() # Remove message queue del self.message_queues[s] return "got it" def event_loop(tcpserver, inputs, outputs): while inputs: # Wait for at least one of the sockets to be ready for processing print('waiting for the next event') # 开始select 监听, 对input_list 中的服务器端server 进行监听 # 当socket调用send, recv等函数时, 就会再次调用此函数, 这时返回的第二个参数就会有值 readable, writable, exceptional = select.select(inputs, outputs, inputs) if readable is not None: tcp_recever = tcpserver.handler_recever(readable) if tcp_recever == 'got it': print("server have received") if writable is not None: tcp_send = tcpserver.handler_send(writable) if tcp_send == 'got it': print("server have send") if exceptional is not None: tcp_exception = tcpserver.handler_exception(exceptional) if tcp_exception == 'got it': print("server have exception") sleep(0.8) if __name__ == '__main__': server_address = ('localhost', 8090) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) inputs = [server] outputs = [] message_queues = {} tcpserver = TCPServer(server, server_address, inputs, outputs, message_queues) # 开启事件循环 event_loop(tcpserver, inputs, outputs) 

客户端:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/c5f837210c17a59fc7d32698dbf2a7f6.html