Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState. SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this ) {
//从watchTable删除掉path对于的watcher
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG .isTraceEnabled()) {
ZooTrace. logTraceMessage(LOG,
ZooTrace. EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
//循环处理所有关于path的Watcher, 这里Watcher对象实际上就是ServerCnxn类型对象
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
Code 7: NIOServerCnxn.process方法,发送notification给Client端
synchronized public void process (WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG .isTraceEnabled()) {
ZooTrace. logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK ,
"Deliver event " + event + " to 0x"
+ Long. toHexString(this. sessionId)
+ " through " + this );
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
//发送notification给Client端
sendResponse(h, e, "notification");
}
3. 总结
Watcher具有one-time trigger的特性,在代码中我们也可以看到一个watcher被处理后会立即从watchTable中删掉。
--------------------------------------分割线 --------------------------------------
Ubuntu 14.04安装分布式存储Sheepdog+ZooKeeper
CentOS 6安装sheepdog 虚拟机分布式储存