case OpCode. getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream. byteBuffer2Record(request.request,
getDataRequest);
//获得znode对象
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
//n为null, 抛出NoNodeException异常
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclL;
synchronized(n) {
aclL = n. acl;
}
//检查是否有读权限
PrepRequestProcessor. checkACL(zks, zks.getZKDatabase().convertLong(aclL),
ZooDefs.Perms. READ,
request. authInfo);
//构建状态对象stat
Stat stat = new Stat();
//获得指定path的znode数据,
//如果GetDataRequest.getWatcher()返回true, 将ServerCnxn类型对象cnxn传递进去。
//ServerCnxn是实现了Watcher接口
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest. getWatch() ? cnxn : null);
//构建GetDataResponse对象
rsp = new GetDataResponse(b, stat);
break;
}
Code 4: DataTree.getData()方法
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
//从nodes map中获取指定path的DataNode对象
DataNode n = nodes.get(path);
//如果n为null, 则抛出NoNodeException异常
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
//将n的状态copy到stat中
n.copyStat(stat);
//如果watcher不会null, 则将(path, watcher)键值对放入dataWatchers Map里
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
//返回节点数据
return n.data ;
}
}
2. 修改znode数据触发Watcher
在Zookeeper二阶段提交的COMMIT阶段。当Follower从Leader那接收到一个写请求的Leader.COMMIT数据包,会调用FinalRequestProcessor.processRequest()方法。Leader本身在发送完Leader.COMMIT数据包,也会调用FinalRequestProcessor.processRequest()方法。
如果是setData修改数据请求,那么FinalRequestProcessor.processRequest()方法最终会调用到DataTree.setData方法将txn应用到指定znode上,同时触发Watcher,并发送notification给Client端。
其关SetData请求的时序图如下:
triggerWatcher
Code 5: DataTree.setData()方法
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
//根据path, 获得DataNode对象n
DataNode n = nodes.get(path);
//如果n为null, 则抛出NoNodeException异常
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
lastdata = n. data;
n. data = data;
n. stat.setMtime(time);
n. stat.setMzxid(zxid);
n. stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
if(lastPrefix != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length ));
}
//触发Watcher
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
Code 6: WatchManage.triggerWatcher()方法,触发Watcher。