Quorum请求是转发给Leader处理,并且需要得一个Follower Quorum确认的请求。这些请求包括:
1)znode的写操作(OpCode.create,OpCode.delete,OpCode.setData,OpCode.setACL)
2)Session的创建和关闭操作(OpCode.createSession和OpCode.closeSession)
3)OpCode.multi操作。
本文分析了Client, Follower和Leader协同完成Quorum请求的过程。另外需注意的是OpCode.sync请求也需要转发给Leader, 但不需要得到一个Follower Quorum确认。本文也会提到OpCode.sync操作。
数据结构Request类型对象:Server内部传递的数据结构。
属性 说明sessionId 会话ID
cxid 客户端事务ID
type 操作类型, 如OpCode.setData
request 请求Record对象,如SetDataRequest
cnxn Server和Client端的连接对象
hdr 请求事务头TxnHeader
txn 请求事务体Record,如OpCode.setData请求,则是SetDataTxn类型对象
zxid ZooKeeper事务ID
authInfo 认证信息
createTime 创建时间
owner 所有者
e 处理过程中的异常
QuorumPacket类型对象:用于ZooKeeper服务器之间传递的数据包。
属性 说明type QuorumPacket类型,如Leader.REQUEST和Leader.ACK等
zxid ZooKeeper事务ID
data 数据包的数据:
在Leader.REQUEST中,数据依次如下:
Request.sessionId
Request.cxid
Request.type
Request.request
在Leader.PROPOSAL中,数据依次如下:
Request.hdr
Request.txn
在Leader.ACK中,为null
在Leader.COMMIT中,为null
authinfo 认证信息
Quorum请求流程
假设拓扑结构如下图,Client A和Follower A建立连接。
数据流程图如下。在图中,连接线说明前的数字表示事件发的生时序,主时序是直接使用一个数字表示,并且数字越小表示越早发生(如1 Client Request是在2 Request之前发生)。对于和主时序并发的操作使用主时序序号后加上一个括号括起来的数字表示,如7(1)-n Request指和7 Request是并发的。7(1)-n中n表示以7(1)开头的操作时序。
我们从数据流程图中Step 1讲起:Client A 发起一个Quorum请求给Follower A。
【Client A, Step 1】Client A调用Quorum请求对应的方法:
如调用Zookeeper的构造函数,会发起OpCode.createSession请求,
如调用Zookeeper.setData方法,会发起OpCode.setData操作。
最终会调用ClientCnxn.submitRequest方法将请求放入outgoingQueue队列中,并阻塞等待Follower A反馈。而ClientCnxn.SendThread线程会从outgoingQueue中取出请求,并发送给Follower A。
下面代码Zookeeper.setData方法: Client A构建对象发送给Follower A
public Stat setData( final String path, byte data[], int version)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils. validatePath(clientPath);
//通过传入的path构造完整serverPath
final String serverPath = prependChroot(clientPath);
//构造一个Request头
RequestHeader h = new RequestHeader();
//设置类型为setData
h.setType(ZooDefs.OpCode.setData);
//构造一个SetData请求体
SetDataRequest request = new SetDataRequest();
//设置需要修改node的serverPath
request.setPath(serverPath);
//设置需要修改的node的data
request.setData(data);
//设置需要修改的node的version
request.setVersion(version);
//构建SetDataResponse对象
SetDataResponse response = new SetDataResponse();