客户端与服务端的事件watcher源码阅读 (4)

跟进去dataWatches.triggerWatch(path, EventType.NodeDataChanged);,源码如下, 主要的逻辑就是取出存放在服务端的watch,然后逐个回调他们的processor函数,问题来了,到底是哪些watcher呢? 其实就是我们在客户端启动时添加getData()时存进去的wather,也就是ServerCnxn

// todo 跟进去服务端的 触发事件, 但是吧, 很纳闷. 就是没有往客户端发送数据的逻辑 public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } // todo 继续跟进去, 看它如何回调的 w.process(e); } return watchers; }

怀着激动的心情去看看ServerCnxn的process()方法做了什么事?

来到ServerCnxn的实现类NIOServerCnxn, 确实很激动,看到了服务端在往客户端发送事务型消息, 并且new ReplyHeader(-1, -1L, 0)第一个位置上的参数是-1, 这一点很重要,因为客户端在接受到这个xid=-1的标记后,就会将这条响应交给EventThread处理

@Override synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); // todo 往服务端发送了 e event类型消息 sendResponse(h, e, "notification"); } 处理回调回调watch使用的响应

进入到SendThread的读就绪源码部分,如下: 它根据header.xid=-1就知道了这是事务类型的响应

// todo 服务端抛出来的事件, 客户端将把他存在EventThread的 watingEvents 队列中 // todo 它的实现逻辑也是这样, 会有另外一个线程不断的消费这个队列 if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } // todo 创建watcherEvent 并将服务端发送回来的数据,反序列化进这个对象中 WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path // todo 将server path 反转成 client path if (chrootPath != null) { String serverPath = event.getPath(); if (serverPath.compareTo(chrootPath) == 0) event.setPath("http://www.likecs.com/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } //todo 跟进去 eventThread.queueEvent(we); return; } }

在这个方法的最后,将这个相应添加进EventThread消费的队列中,跟进 eventThread.queueEvent(we);

// todo public void queueEvent(WatchedEvent event) { // todo 如果事件的类型是 none, 或者sessionState = 直接返回 /** * todo 事件的类型被设计成 watcher 接口的枚举 * None (-1), * NodeCreated (1), * NodeDeleted (2), * NodeDataChanged (3), * NodeChildrenChanged (4); */ if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // materialize the watchers based on the event // todo 根据事件的具体类型,将观察者具体化, 跟进去 // todo 这个类是ClientCnxn的辅助类,作用就是将watcher 和它观察的事件封装在一起 WatcherSetEventPair pair = new WatcherSetEventPair( //todo 跟进这个 materialize方法. 其实就是从map中取出了和当前client关联的全部 watcher set watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing // todo 将watch集合 和 event 进行排队(按顺序添加到队列里了), 以便后续处理 , 怎么处理呢? 就在EventThread的run循环中消费 // todo watingEvent ==> LinkedBlockingQueue<Object> waitingEvents.add(pair); }

上面的代码主要做了如下几件事:

从map中取出和当前事件相关的全部watcher

将watcher set 添加进 waitingEvents队列中,等待EventThead的消费

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzdwsy.html