利用ZooKeeper的发布/订阅模式实现配置动态变更

ZooKeeper的Watcher事件机制可以说分布式场景下的观察者模式的实现。基于这个watcher事件机制,配合注册到特定的ZNode节点,可以实现java应用的配置运行时的变更。在学习zookeeper之前,听同事说配置可以在运行时动态变更,觉得不可思议。研习了zookeeper之后,实现这个功能是很easy的。

  发布/订阅系统设计起来无非两种模式,推和拉。

1. 推模式,服务端负责把变更的数据推给订阅的客户端。Web即时通信里的Comet技术便可以实现这种功能。
2. 拉模式,也就是客户端定时轮询服务端。拉模式不仅有延迟,给服务端带来很大压力,而且十分低效。

  zookeeper采用的是推拉结合的模式
1. 客户端订阅znode节点
2. 被订阅的节点发生变化后,zookeeper服务端向客户端发生数据变更的watcher事件通知
3. 客户端接收到watcher通知后,主动从服务端拉取变更的数据

  阿里的配置变更中间件diamond,同样是基于推拉结合的模式来实现数据的动态变更。初学zookeeper,写了一个数据库配置动态变更的demo。zookeeper自带的Watcher注册后,数据变更一次便会自动取消注册。这个设计实在反人类,大多数的开发者的需求肯定是注册一次,服务终生。所以转向开源的ZkClient客户端。
  首先本地需要启动一个zookeeper的服务端,并且有一个“/db”节点,我用这个节点存储数据库配置信息。
  客户端工程需要引入zookeeper和zkclient的依赖。

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.9</version>
</dependency>
<dependency>
    <groupId>com.github.adyliu</groupId>
    <artifactId>zkclient</artifactId>
    <version>2.1.1</version>
</dependency>
  客户端demo

import com.github.zkclient.IZkDataListener;
import com.github.zkclient.ZkClient;

import java.io.IOException;

/**
 * project  : zk
 * package  : PACKAGE_NAME
 * author  : lvsheng
 * date    : 2016/10/5 下午11:17
 */
public class ZkClientTest {

public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        zkClient.subscribeDataChanges("/db", new IZkDataListener() {
            public void handleDataChange(String dataPath, byte[] data) throws Exception {
                System.out.println(new String(data));
            }

public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println(dataPath);
            }
        });

try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

  现在我们来测试这个功能,在控制台依次输入如下命令:

[zk: localhost:2181(CONNECTED) 48] set /db chat.jdbc.driver=com.mysql.jdbc.Driver|chat.jdbc.url=jdbc:mysql://192.168.146.120:3306/chat_test?useUnicode=true&amp;amp;characterEncoding=GBK|chat.jdbc.maxActive=5

[zk: localhost:2181(CONNECTED) 49] set /db chat.jdbc.driver=com.mysql.jdbc.Driver|chat.jdbc.url=jdbc:mysql://127.0.0.1:3306/chat_test?useUnicode=true&amp;amp;characterEncoding=GBK|chat.jdbc.maxActive=5         

  观察Java代码的输出

chat.jdbc.driver=com.mysql.jdbc.Driver|chat.jdbc.url=jdbc:mysql://192.168.146.120:3306/chat_test?useUnicode=true&amp;amp;characterEncoding=GBK|chat.jdbc.maxActive=5
chat.jdbc.driver=com.mysql.jdbc.Driver|chat.jdbc.url=jdbc:mysql://127.0.0.1:3306/chat_test?useUnicode=true&amp;amp;characterEncoding=GBK|chat.jdbc.maxActive=5

  数据库配置的ip被成功的动态修改。

ZooKeeper学习总结 

Ubuntu 14.04安装分布式存储Sheepdog+ZooKeeper 

CentOS 6安装sheepdog 虚拟机分布式储存 

ZooKeeper集群配置

使用ZooKeeper实现分布式共享锁

分布式服务框架 ZooKeeper -- 管理分布式环境中的数据

ZooKeeper集群环境搭建实践

ZooKeeper服务器集群环境配置实测

ZooKeeper集群安装

Zookeeper3.4.6的安装

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/65f2b7086c25aa4fc2547ec4750d3924.html