Zookeeper源码分析:Watcher机制(2)

case OpCode. getData: {
              lastOp = "GETD";
              GetDataRequest getDataRequest = new GetDataRequest();
              ByteBufferInputStream. byteBuffer2Record(request.request,
                      getDataRequest);
              //获得znode对象
              DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
              //n为null, 抛出NoNodeException异常
              if (n == null) {
                  throw new KeeperException.NoNodeException();
              }
              Long aclL;
              synchronized(n) {
                  aclL = n. acl;
              }
              //检查是否有读权限
              PrepRequestProcessor. checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                      ZooDefs.Perms. READ,
                      request. authInfo);
              //构建状态对象stat
              Stat stat = new Stat();
              //获得指定path的znode数据,
              //如果GetDataRequest.getWatcher()返回true, 将ServerCnxn类型对象cnxn传递进去。
              //ServerCnxn是实现了Watcher接口
              byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                      getDataRequest. getWatch() ? cnxn : null);
              //构建GetDataResponse对象
              rsp = new GetDataResponse(b, stat);
              break;
          }


Code 4: DataTree.getData()方法

public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    //从nodes map中获取指定path的DataNode对象
    DataNode n = nodes.get(path);
    //如果n为null, 则抛出NoNodeException异常
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        //将n的状态copy到stat中
        n.copyStat(stat);
        //如果watcher不会null, 则将(path, watcher)键值对放入dataWatchers Map里
        if (watcher != null) {
            dataWatches.addWatch(path, watcher);
        }
        //返回节点数据
        return n.data ;
    }
}


2. 修改znode数据触发Watcher
在Zookeeper二阶段提交的COMMIT阶段。当Follower从Leader那接收到一个写请求的Leader.COMMIT数据包,会调用FinalRequestProcessor.processRequest()方法。Leader本身在发送完Leader.COMMIT数据包,也会调用FinalRequestProcessor.processRequest()方法。

如果是setData修改数据请求,那么FinalRequestProcessor.processRequest()方法最终会调用到DataTree.setData方法将txn应用到指定znode上,同时触发Watcher,并发送notification给Client端。

其关SetData请求的时序图如下:

Zookeeper源码分析:Watcher机制

triggerWatcher

Code 5: DataTree.setData()方法

public Stat setData(String path, byte data[], int version, long zxid,
        long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    //根据path, 获得DataNode对象n
    DataNode n = nodes.get(path);
    //如果n为null, 则抛出NoNodeException异常
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    byte lastdata[] = null;
    synchronized (n) {
        lastdata = n. data;
        n. data = data;
        n. stat.setMtime(time);
        n. stat.setMzxid(zxid);
        n. stat.setVersion(version);
        n.copyStat(s);
    }
    // now update if the path is in a quota subtree.
    String lastPrefix = getMaxPrefixWithQuota(path);
    if(lastPrefix != null) {
      this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
          - (lastdata == null ? 0 : lastdata.length ));
    }
    //触发Watcher
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}


Code 6: WatchManage.triggerWatcher()方法,触发Watcher。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/9495767d1bddd6b71058d21ae6294be3.html