以下是DataWatcher.java类的代码:
import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; public class DataWatcher implements Watcher, Runnable { private static String hostPort = "localhost:2181"; private static String zooDataPath = "/MyConfig"; byte zoo_data[] = null; ZooKeeper zk; public DataWatcher() { try { zk = new ZooKeeper(hostPort, 2000, this); if (zk != null) { try { //Create the znode if it doesn't exist, with the following code: if (zk.exists(zooDataPath, this) == null) { zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } } catch (IOException e) { e.printStackTrace(); } } public void printData() throws InterruptedException, KeeperException { zoo_data = zk.getData(zooDataPath, this, null); String zString = new String(zoo_data); // The following code prints the current content of the znode to the console: System.out.printf("\nCurrent Data @ ZK Path %s: %s", zooDataPath, zString); } @Override public void process(WatchedEvent event) { System.out.printf( "\nEvent Received: %s", event.toString()); //We will process only events of type NodeDataChanged if (event.getType() == Event.EventType.NodeDataChanged) { try { printData(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException, KeeperException { DataWatcher dataWatcher = new DataWatcher(); dataWatcher.printData(); dataWatcher.run(); } public void run() { try { synchronized (this) { while (true) { wait(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }我们来看一下DataWatcher.java类的代码来理解一个ZooKeeper监视器的实现。 DataWatcher公共类实现Watcher接口以及Runnable接口,打算将监视器作为线程运行。 main方法创建DataWatcher类的一个实例。 在前面的代码中,DataWatcher构造方法尝试连接到在本地主机上运行的ZooKeeper实例。 如果连接成功,我们用下面的代码检查znode路径/MyConfig是否存在:
if (zk.exists(zooDataPath, this) == null) {如果znode不存在ZooKeeper命名空间中,那么exists方法调用将返回null,并且尝试使用代码将其创建为持久化znode,如下所示:
zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);接下来是process方法,它在org.apache.ZooKeeper的Watcher接口中声明,并由DataWatcher类使用以下代码实现:
public void process(WatchedEvent event) {为了简单起见,在process方法中,打印从ZooKeeper实例接收的事件,并仅对NodeDataChanged类型的事件进行进一步处理,如下所示:
if (event.getType() == Event.EventType.NodeDataChanged)当znode路径/MyConfig的数据字段发生任何更新或更改而收到NodeDataChanged类型的事件时,调用printData方法来打印znode的当前内容。 在znode上执行一个getData调用时,我们再次设置一个监视,这是该方法的第二个参数,如下面的代码所示:
zoo_data = zk.getData(zooDataPath, this, null);监视事件是发送给设置监视的客户端的一次性触发器,为了不断接收进一步的事件通知,客户端应该重置监视器。
DataUpdater.java是一个简单的类,它连接到运行本地主机的ZooKeeper实例,并用随机字符串更新znode路径/MyConfig的数据字段。 在这里,我们选择使用通用唯一标识符(UUID)字符串更新znode,因为后续的UUID生成器调用将保证生成唯一的字符串。
DataUpdater.java类代码如下:
import java.io.IOException; import java.util.UUID; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class DataUpdater implements Watcher { private static String hostPort = "localhost:2181"; private static String zooDataPath = "/MyConfig"; ZooKeeper zk; public DataUpdater() throws IOException { try { zk = new ZooKeeper(hostPort, 2000, this); } catch (IOException e) { e.printStackTrace(); } } // updates the znode path /MyConfig every 5 seconds with a new UUID string. public void run() throws InterruptedException, KeeperException { while (true) { String uuid = UUID.randomUUID().toString(); byte zoo_data[] = uuid.getBytes(); zk.setData(zooDataPath, zoo_data, -1); try { Thread.sleep(5000); // Sleep for 5 secs } catch(InterruptedException e) { Thread.currentThread().interrupt(); } } } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DataUpdater dataUpdater = new DataUpdater(); dataUpdater.run(); } @Override public void process(WatchedEvent event) { System.out.printf("\nEvent Received: %s", event.toString()); } }