client发起socket连接的时候,server监听了该端口,接收到client的连接请求,然后把建立练级的SocketChannel放入队列里面,交给SelectorThread处理
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection public boolean addAcceptedConnection(SocketChannel accepted) { if (stopped || !acceptedQueue.offer(accepted)) { return false; } wakeupSelector(); return true; } 建立session连接SelectorThread是一个不断循环的线程,每次循环都会处理刚刚建立的socket连接
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run while (!stopped) { try { select(); // 处理对立中的socket processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { // 向该socket注册读事件 key = accepted.register(selector, SelectionKey.OP_READ); // 创建一个NIOServerCnxn维护session NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); addCnxn(cnxn); // 省略中间代码... } }说了这么久,我们说的session究竟是什么还没有解释,session中文翻译是会话,在这里就是zk的server和client维护的一个具有一些特别属性的网络连接,网络连接这里就是socket连接,一些特别的属性包括
sessionId:唯一标示一个会话
sessionTimeout:这个连接的超时时间,超过这个时间server就会把连接断开
所以session建立的两步就是
建立socket连接
client发起建立session请求,server建立一个实例来维护这个连接
server收到ConnectRequest之后,按照正常处理io的方式处理这个request,server端的主要操作是
反序列化为ConnectRequest
根据request中的sessionId来判断是新的session连接还是session重连
如果是新连接
生成sessionId
创建新的SessionImpl并放入org.apache.zookeeper.server.SessionTrackerImpl#sessionExpiryQueue
封装该请求为新的request在processorChain中传递,最后交给FinalRequestProcessor处理
如果是重连
关闭sessionId对应的原来的session
关闭原来的socket连接
sessionImp会在sessionExpiryQueue中由于过期被清理
重新打开一个session
将原来的sessionId设置到当前的NIOServerCnxn实例中,作为新的连接的sessionId
校验密码是否正确密码错误的时候直接返回给客户端,不可用的session
密码正确的话,新建SessionImpl
返回给客户端sessionId
总体流程是
其中有一个session生成算法我们来看下
public static long initializeNextSession(long id) { // sessionId是long类型,共8个字节,64位 long nextSid; // 取时间戳的的低40位作为初始化sessionId的第16-55位,这里使用的是无符号右移,不会出现负数 nextSid = (Time.currentElapsedTime() << 24) >>> 8; // 使用serverId(配置文件中指定的myid)作为高8位 nextSid = nextSid | (id <<56); // nextSid为long的最小值,这中情况不可能出现,这里只是作为一个case列在这里 if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) { ++nextSid; // this is an unlikely edge case, but check it just in case } return nextSid; }初始化sessionId的组成
myid(1字节)+截取的时间戳低40位(5个字节)+2个字节(初始化都是0)
每个server再基于这个id不断自增,这样的算法就保证了每个server的sessionId是全局唯一的。
总结session在zk框架中是一个重要概念,很多功能都依赖于session,比如临时节点,session关闭后就自动删除了。本文主要介绍了session的建立过程中client和server各自的处理方式。