为实现这个集群监控模型,我们将开发两个Java类。 ClusterMonitor类将持续运行监视器,以监视ZooKeeper树中的路径/Members。 处理完引发事件后,我们将在控制台中打印znode列表并重置监视。 另一个类ClusterClient将启动到ZooKeeper服务器的连接,在/Members下创建一个ephemeral znode。
要模拟具有多个节点的集群,我们在同一台计算机上启动多个客户端,并使用客户端进程的进程ID创建ephemeral znode。 通过查看进程标识,ClusterMonitor类可以确定哪个客户进程已经关闭,哪些进程还在。 在实际情况中,客户端进程通常会使用当前正在运行的服务器的主机名创建ephemeral znode。 下面显示了这两个类的源代码。
ClusterMonitor.java类定义如下:
import java.io.IOException; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class ClusterMonitor implements Runnable { private static String membershipRoot = "/Members"; private final Watcher connectionWatcher; private final Watcher childrenWatcher; private ZooKeeper zk; boolean alive=true; public ClusterMonitor(String HostPort) throws IOException, InterruptedException, KeeperException { connectionWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if(event.getType()==Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.printf("\nEvent Received: %s", event.toString()); } } }; childrenWatcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.printf("\nEvent Received: %s", event.toString()); if (event.getType() == Event.EventType.NodeChildrenChanged) { try { //Get current list of child znode, //reset the watch List<String> children = zk.getChildren( membershipRoot, this); wall("!!!Cluster Membership Change!!!"); wall("Members: " + children); } catch (KeeperException e) { throw new RuntimeException(e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); alive = false; throw new RuntimeException(e); } } } }; zk = new ZooKeeper(HostPort, 2000, connectionWatcher); // Ensure the parent znode exists if(zk.exists(membershipRoot, false) == null) { zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // Set a watch on the parent znode List<String> children = zk.getChildren(membershipRoot, childrenWatcher); System.err.println("Members: " + children); } public synchronized void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public void wall (String message) { System.out.printf("\nMESSAGE: %s", message); } public void run() { try { synchronized (this) { while (alive) { wait(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { this.close(); } } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { if (args.length != 1) { System.err.println("Usage: ClusterMonitor <Host:Port>"); System.exit(0); } String hostPort = args[0]; new ClusterMonitor(hostPort).run(); } }ClusterClient.java类定义如下:
import java.io.IOException; import java.lang.management.ManagementFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class ClusterClient implements Watcher, Runnable { private static String membershipRoot = "/Members"; ZooKeeper zk; public ClusterClient(String hostPort, Long pid) { String processId = pid.toString(); try { zk = new ZooKeeper(hostPort, 2000, this); } catch (IOException e) { e.printStackTrace(); } if (zk != null) { try { zk.create(membershipRoot + 'http://www.likecs.com/' + processId, processId.getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch ( KeeperException | InterruptedException e) { e.printStackTrace(); } } } public synchronized void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { System.out.printf("\nEvent Received: %s", event.toString()); } public void run() { try { synchronized (this) { while (true) { wait(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { this.close(); } } public static void main(String[] args) { if (args.length != 1) { System.err.println("Usage: ClusterClient <Host:Port>"); System.exit(0); } String hostPort = args[0]; //Get the process id String name = ManagementFactory.getRuntimeMXBean().getName(); int index = name.indexOf('@'); Long processId = Long.parseLong(name.substring(0, index)); new ClusterClient(hostPort, processId).run(); } }使用下面命令编译这两个类:
$ javac -cp $CLASSPATH ClusterMonitor.java $ javac -cp $CLASSPATH ClusterClient.java要执行群集监控模型,打开两个终端。 在其中一个终端中,运行ClusterMonitor类。 在另一个终端中,通过在后台运行ClusterClient类来执行多个实例。
在第一个终端中,执行ClusterMonitor类:
$ java -cp $CLASSPATH ClusterMonitorlocalhost:2181