继续跟进stat = zk.setData(path, args[2].getBytes(), 下面的逻辑也很简单,就是将用户的输入封装进来request中,通过ClientCnxn类的submit方法提交到一个队列中,等待着sendThread去消费
这次有目的的看一下submitRequest的最后一个参数为null, 这个参数是WatchRegistration的位置,一开始置为null
public Stat setData(final String path, byte data[], int version) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.setData); SetDataRequest request = new SetDataRequest(); request.setPath(serverPath); request.setData(data); request.setVersion(version); SetDataResponse response = new SetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } return response.getStat(); }跟进这个submitRequest()方法, 源码如下,不处所料的是,它同样被阻塞住了,直到服务端给了它响应
当前代码的主要逻辑就是将request封装进packet,然后将packet添加到ClintCnxn维护的outgoingQueue队列中等待sendThread的消费
这次来到这个方法是因为我们在控制台输入的set 命令而触发的,比较重要的是本次packet携带的WatchRegistration==null, 毫无疑问,这次服务端在FinalRequestProcessor中再处理时取出的watcher==null, 也就不会将path+watcher保存进maptable中
重要:发送事务消息在FinalRequestProcessor的public void processRequest(Request request) {}方法中,有如下代码
//todo 请求头不为空 if (request.hdr != null) { // 获取请求头 TxnHeader hdr = request.hdr; // 获取事务 Record txn = request.txn; // todo 跟进这个方法-----<--!!!!!!-----处理事务的逻辑,在这里面有向客户端发送事件的逻辑, 回调客户端的watcher----!!!!!!--> rc = zks.processTxn(hdr, txn); }继续跟进去
// todo 处理事物日志 public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = hdr.getType(); long sessionId = hdr.getClientId(); // todo 继续跟进去!!!!!!!!! // todo 跟进 processTxn(hdr, txn) rc = getZKDatabase().processTxn(hdr, txn);跟进ZkDatabase.java中的processTxn(hdr, txn)方法
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { // todo 跟进 processTxn return dataTree.processTxn(hdr, txn); }跟进到DataTree.java
public ProcessTxnResult processTxn(TxnHeader header, Record txn) { ProcessTxnResult rc = new ProcessTxnResult(); try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; rc.multiResult = null; switch (header.getType()) { // todo 根据客客户端发送过来的type进行switch, case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; rc.path = createTxn.getPath(); // todo 跟进这个创建节点的方法 createNode( createTxn.getPath(),根据请求头的值,进而判断出走到那个switch的分支,当前我们在控制台触发,进入到setData分支如下:跟进这个方法中可以看到它主要做了如下几件事
使用传递进来的新值替代旧data
dataWatches.triggerWatch(path, EventType.NodeDataChanged);触发指定的事件watch,什么事件呢? NodeDataChange, 触发了哪个watcher呢? 跟进去查看
//todo setData public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { // todo 修改内存的数据 lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } // todo 终于 看到了 服务端 关于触发NodeDataChanged的事件 dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }