zookeeper源码 — 四、session建立

session建立的主要过程

客户端发起连接

服务端创建session

session建立的主要过程

用一张图来说明session建立过程中client和server的交互

zookeeper源码 — 四、session建立

主要流程

服务端启动,客户端启动

客户端发起socket连接

服务端accept socket连接,socket连接建立

客户端发送ConnectRequest给server

server收到后初始化ServerCnxn,代表一个和客户端的连接,即session,server发送ConnectResponse给client

client处理ConnectResponse,session建立完成

客户发起连接 和server建立socket连接

客户端要发起连接要先启动,不论是使用curator client还是zkClient,初始化的都是初始化org.apache.zookeeper.ZooKeeper#ZooKeeper。

Zookeeper初始化的主要工作是初始化自己的一些关键组件

Watcher,外部构造好传入

初始化StaticHostProvider,决定客户端选择连接哪一个server

ClientCnxn,客户端网络通信的组件,主要启动逻辑就是启动这个类

ClientCnxn包含两个线程

SendThread,负责client端消息的发送和接收

EventThread,负责处理event

ClientCnxn初始化的过程就是初始化启动这两个线程,客户端发起连接的主要逻辑在SendThread线程中

// org.apache.zookeeper.ClientCnxn.SendThread#run @Override public void run() { clientCnxnSocket.introduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = System.currentTimeMillis(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds while (state.isAlive()) { try { // client是否连接到server,如果没有连接到则连接server if (!clientCnxnSocket.isConnected()) { if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing if (closing || !state.isAlive()) { break; } // 这个里面去连接server startConnect(); clientCnxnSocket.updateLastSendAndHeard(); } // 省略中间代码... clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); // 省略中间代码... }

SendThread#run是一个while循环,只要client没有被关闭会一直循环,每次循环判断当前client是否连接到server,如果没有则发起连接,发起连接调用了startConnect

private void startConnect() throws IOException { state = States.CONNECTING; InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { // 通过hostProvider来获取一个server地址 addr = hostProvider.next(1000); } // 省略中间代码... // 建立client与server的连接 clientCnxnSocket.connect(addr); }

到这里client发起了socket连接,server监听的端口收到client的连接请求后和client建立连接。

通过一个request来建立session连接

socket连接建立后,client会向server发送一个ConnectRequest来建立session连接。两种情况会发送ConnectRequest

在上面的connect方法中会判断是否socket已经建立成功,如果建立成功就会发送ConnectRequest

如果socket没有立即建立成功(socket连接建立是异步的),则发送这个packet要延后到doTransport中

发送ConnectRequest是在下面的方法中

// org.apache.zookeeper.ClientCnxn.SendThread#primeConnection void primeConnection() throws IOException { LOG.info("Socket connection established to " + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session"); isFirstConnect = false; long sessId = (seenRwServerBefore) ? sessionId : 0; ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); // 省略中间代码... // 将conReq封装为packet放入outgoingQueue等待发送 outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly)); } clientCnxnSocket.enableReadWriteOnly(); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request sent on " + clientCnxnSocket.getRemoteSocketAddress()); } }

请求中带的参数

lastZxid:上一个事务的id

sessionTimeout:client端配置的sessionTimeout

sessId:sessionId,如果之前建立过连接取的是上一次连接的sessionId

sessionPasswd:session的密码

服务端创建session 和client建立socket连接

在server启动的过程中除了会启动用于选举的网络组件还会启动用于处理client请求的网络组件

org.apache.zookeeper.server.NIOServerCnxnFactory

zookeeper源码 — 四、session建立

主要启动了三个线程:

AcceptThread:用于接收client的连接请求,建立连接后交给SelectorThread线程处理

SelectorThread:用于处理读写请求

ConnectionExpirerThread:检查session连接是否过期

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpzwxg.html