1. 设置Watcher
使用Watcher需要先实现Watcher接口,并将实现类对象传递到指定方法中,如getChildren, exist等。Zookeeper允许在构造Zookeeper对象时候指定一个默认Watcher对象.getChildren和exit方法可以使用这个默认的Watcher对象,也可以指定一个新Watcher对象。
Code 1: Watcher接口
public interface Watcher {
/**
* Event的状态
*/
public interface Event {
/**
* 在事件发生时,ZooKeeper的状态
*/
public enum KeeperState {
@Deprecated
Unknown (-1),
Disconnected (0),
@Deprecated
NoSyncConnected (1),
SyncConnected (3),
AuthFailed (4),
ConnectedReadOnly (5),
SaslAuthenticated(6),
Expired (-112);
private final int intValue;
KeeperState( int intValue) {
this.intValue = intValue;
}
......
}
/**
* ZooKeeper中的事件
*/
public enum EventType {
None (-1),
NodeCreated (1),
NodeDeleted (2),
NodeDataChanged (3),
NodeChildrenChanged (4);
private final int intValue; // Integer representation of value
// for sending over wire
EventType( int intValue) {
this.intValue = intValue;
}
......
}
}
//Watcher的回调方法
abstract public void process(WatchedEvent event);
}
Code 2: Zookeeper.getChildren(final String, Watcher)方法
public List<String> getChildren(final String path, Watcher watcher)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils. validatePath(clientPath);
WatchRegistration wcb = null;
//如果watcher不等于null, 构建WatchRegistration对象,
//该对象描述了watcher和path之间的关系
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}
//在传入的path加上root path前缀,构成服务器端的绝对路径
final String serverPath = prependChroot(clientPath);
//构建RequestHeader对象
RequestHeader h = new RequestHeader();
//设置操作类型为OpCode. getChildren
h.setType(ZooDefs.OpCode. getChildren);
//构建GetChildrenRequest对象
GetChildrenRequest request = new GetChildrenRequest();
//设置path
request.setPath(serverPath);
//设置是否使用watcher
request.setWatch(watcher != null);
//构建GetChildrenResponse对象
GetChildrenResponse response = new GetChildrenResponse();
//提交请求,并阻塞等待结果
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code. get(r.getErr()),
clientPath);
}
return response.getChildren();
}
Follower的NIOServerCnxn类接到了Client的请求,会调用ZookeeperServer.processPacket()方法。该方法会构建一个Request对象,并调用第一个处理器FollowerRequestProcessor。
由于我们的请求只是一个读操作,而不是一个Quorum请求或者sync请求,所以FollowerRequestProcessor不需要调用Follower.request()方法将请求转给Leader,只需要将请求传递到下一个处理器CommitProcessor。
处理器CommitProcessor线程发现请求是读请求后,直接将Requet对象加入到toProcess队列中,在接下的循环中会调用FinalRequestProcessor.processRequest方法进行处理。
FinalRequestProcessor.processRequest方法最终会调用ZKDatabase中的读操作方法(如statNode和getData方法), 而ZKDatabase的这些方法会最终调用DataTree类的方法来获取指定path的znode信息并返回给Client端,同时也会设置Watcher。
Code 3: FinalRequestProcessor对OpCode.getData请求的处理