客户端与服务端的事件watcher源码阅读 (2)

我们直接去看FinalRequestProcessor的public void processRequest(Request request) {}方法,看他针对getData()方式的请求做出了哪些动作.下面来了个小高潮,zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);跟进watcher的有无给服务端添加不同的Watcher

真的得划重点了,当我发现这一点时,我的心情是超级激动的,就像发现了新大陆一样

case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); if (n == null) { throw new KeeperException.NoNodeException(); } PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo); Stat stat = new Stat(); // todo 这里的操作 getDataRequest.getWatch() ? cnxn : null 对应可客户端的 跟进watcher有没有而决定往服务端传递 true 还是false 相关 // todo 跟进去 getData() byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); //todo cnxn的Processor()被回调, 往客户端发送数据 , 什么时候触发呢? 就是上面的 处理事务时的回调 第127行 // todo 构建了一个 rsp ,在本类的最后面将rsp 响应给client rsp = new GetDataResponse(b, stat); break; }

继续跟进这个getData()在服务端维护了一份path+watcher的map

public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); if (watcher != null) { // todo 将path 和 watcher 绑定在一起 dataWatches.addWatch(path, watcher); } return n.data; } } 客户端打开命令行,修改服务端node的状态

书接上回,当客户单的代码去创建ClientCnxn时,有下面的逻辑 , 它开启了两条守护线程, sendThread负责向服务端发送心跳,已经和服务端进行用户相关的IO交流, EventThread就负责和txn事务相关的处理逻辑,级别上升到针对node

// todo start就是启动了在构造方法中创建的线程 public void start() { sendThread.start(); eventThread.start(); }

到目前为止,客户端就有如下三条线程了

负责处理用户在控制台输入命令的主线程

守护线程1: seadThread

守护线程2: eventThread

跟进主线程的处理用户输入部分的逻辑代码如下:

下面的代码的主要逻辑就是处理用户输入的命令,当通过if-else选择分支判断用户到底输入的啥命令

按照我们的假定的场景,用户输入的命令是这样的 set /path newValue 所以,毫无疑问,经过解析后代码会去执行下面的stat = zk.setData(path, args[2].getBytes(),部分

// todo zookeeper客户端, 处理用户输入命令的具体逻辑 // todo 用大白话讲,下面其实就是把 从控制台获取的用户的输入信息转换成指定的字符, 然后发送到服务端 // todo MyCommandOptions 是处理命令行选项和shell脚本的工具类 protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { // todo 在这个方法中可以看到很多的命令行所支持的命令 Stat stat = new Stat(); // todo 获取命令行输入中 0 1 2 3 ... 位置的内容, 比如 0 位置是命令 1 2 3 位置可能就是不同的参数 String[] args = co.getArgArray(); String cmd = co.getCommand(); if (args.length < 1) { usage(); return false; } if (!commandMap.containsKey(cmd)) { usage(); return false; } boolean watch = args.length > 2; String path = null; List<ACL> acl = Ids.OPEN_ACL_UNSAFE; LOG.debug("Processing " + cmd); if (cmd.equals("quit")) { System.out.println("Quitting..."); zk.close(); System.exit(0); } else if (cmd.equals("set") && args.length >= 3) { path = args[1]; stat = zk.setData(path, args[2].getBytes(), args.length > 3 ? Integer.parseInt(args[3]) : -1); printStat(stat);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzdwsy.html