Zookeeper源码分析:Watcher机制

1. 设置Watcher
使用Watcher需要先实现Watcher接口,并将实现类对象传递到指定方法中,如getChildren, exist等。Zookeeper允许在构造Zookeeper对象时候指定一个默认Watcher对象.getChildren和exit方法可以使用这个默认的Watcher对象,也可以指定一个新Watcher对象。

Code 1: Watcher接口

public interface Watcher {

/**
    * Event的状态
    */
    public interface Event {
        /**
        * 在事件发生时,ZooKeeper的状态
        */
        public enum KeeperState {

@Deprecated
            Unknown (-1),

Disconnected (0),

@Deprecated
            NoSyncConnected (1),

SyncConnected (3),

AuthFailed (4),

ConnectedReadOnly (5),

SaslAuthenticated(6),

Expired (-112);

private final int intValue; 

KeeperState( int intValue) {
                this.intValue = intValue;
            } 

......
        }

/**
        * ZooKeeper中的事件
        */
        public enum EventType {
            None (-1),
            NodeCreated (1),
            NodeDeleted (2),
            NodeDataChanged (3),
            NodeChildrenChanged (4);

private final int intValue;    // Integer representation of value
                                            // for sending over wire
            EventType( int intValue) {
                this.intValue = intValue;
            }
            ...... 
        }
    }

//Watcher的回调方法
    abstract public void process(WatchedEvent event);
}


 

Code 2: Zookeeper.getChildren(final String, Watcher)方法

public List<String> getChildren(final String path, Watcher watcher)
    throws KeeperException, InterruptedException
{
    final String clientPath = path;
    PathUtils. validatePath(clientPath);

WatchRegistration wcb = null;
    //如果watcher不等于null, 构建WatchRegistration对象,
    //该对象描述了watcher和path之间的关系
    if (watcher != null) {
        wcb = new ChildWatchRegistration(watcher, clientPath);
    }
   
    //在传入的path加上root path前缀,构成服务器端的绝对路径
    final String serverPath = prependChroot(clientPath);
   
    //构建RequestHeader对象
    RequestHeader h = new RequestHeader();
    //设置操作类型为OpCode. getChildren
    h.setType(ZooDefs.OpCode. getChildren);
    //构建GetChildrenRequest对象
    GetChildrenRequest request = new GetChildrenRequest();
    //设置path
    request.setPath(serverPath);
    //设置是否使用watcher
    request.setWatch(watcher != null);
    //构建GetChildrenResponse对象
    GetChildrenResponse response = new GetChildrenResponse();
    //提交请求,并阻塞等待结果
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code. get(r.getErr()),
                clientPath);
    }
    return response.getChildren();
}


Follower的NIOServerCnxn类接到了Client的请求,会调用ZookeeperServer.processPacket()方法。该方法会构建一个Request对象,并调用第一个处理器FollowerRequestProcessor。

由于我们的请求只是一个读操作,而不是一个Quorum请求或者sync请求,所以FollowerRequestProcessor不需要调用Follower.request()方法将请求转给Leader,只需要将请求传递到下一个处理器CommitProcessor。

处理器CommitProcessor线程发现请求是读请求后,直接将Requet对象加入到toProcess队列中,在接下的循环中会调用FinalRequestProcessor.processRequest方法进行处理。

FinalRequestProcessor.processRequest方法最终会调用ZKDatabase中的读操作方法(如statNode和getData方法), 而ZKDatabase的这些方法会最终调用DataTree类的方法来获取指定path的znode信息并返回给Client端,同时也会设置Watcher。

Code 3: FinalRequestProcessor对OpCode.getData请求的处理

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/9495767d1bddd6b71058d21ae6294be3.html