by:授客 QQ:1033553122
测试环境Win7 64位
Python 3.3.4
kazoo-2.6.1-py2.py3-none-any.whl(windows)
kazoo-2.6.1.tar.gz (linux)
zookeeper-3.4.13.tar.gz
下载地址:
https://www.apache.org/dyn/closer.cgi/zookeeper/
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
代码实践
kazooStudy.py
#!/usr/bin/env python 3.4.0
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
import threading
import time
from kazoo.client import KazooClient
from kazoo.client import KazooState
from kazoo.retry import KazooRetry
def restart_zk_client():
'''重启zookeeper会话'''
global zk_client
global zk_conn_stat
try:
zk_client.restart()
except Exception as e:
print('重启zookeeper客户端异常:%s' % e)
zk_conn_stat = 0 # zookeeper连接状态 1-LOST 2-SUSPENDED 3-CONNECTED/RECONNECTED
def zk_conn_listener(state):
'''zookeeper连接状态监听器'''
global zk_conn_stat
if state == KazooState.LOST:
print('zookeeper connection lost')
zk_conn_stat = 1
# Register somewhere that the session was lost
thread = threading.Thread(target=restart_zk_client)
thread.start()
elif state == KazooState.SUSPENDED:
print('zookeeper connection dicconnected')
zk_conn_stat = 2
# Handle being disconnected from Zookeeper
else:
zk_conn_stat = 3
print('zookeeper connection cconnected/reconnected')
# Handle being connected/reconnected to Zookeeper
# 监视器
# 当节点有变化、节点被删除时,将以多线程的方式调用以参数形式传递给get()、exists()的监视函数,监视函数将会接收到一个WatchedEvent实例
def event_listener(event):
print(event)
if __name__ == '__main__':
try:
# 建立连接
zk_client = KazooClient(hosts='127.0.0.1:2181')
zk_client.add_listener(zk_conn_listener) # 添加监听器,监听连接状态
zk_client.start() # 初始化到zk的连接,可以设置超时时间 zk_client.start(timeout=15) 默认15秒
print('zk_client state:', zk_client.state) # 查看链接状态
# 创建节点
# ensure_path() 递归创建path中不存在的节点,但是不能为节点设置数据,仅ACL.
zk_client.ensure_path('/node1')
# 创建永久节点
# create创建节点的同时,可为节点设置数据,要求path路径必须存在
if not zk_client.exists('/node1/subNode1'):
zk_client.create('/node1/subNode1', b'sub node1')
# 创建临时节点
# 注意:会话丢失、重启会话会导致zookeeper删除重启会话前创建的临时节点
if not zk_client.exists('/node1/subNode2'):
zk_client.create('/node1/subNode2', b'sub node2', ephemeral=True)
# 创建有序临时节点
zk_client.create('/node1/subNode', b'sub nodexxxx', ephemeral=True, sequence=True)
# 读取数据
# 判断节点是否存在
if zk_client.exists('/node1'): # 如果返回值为None则表示不存在给定节点
print('存在节点node1,节点路径/node1')
# 获取节点相关数据
data, stat = zk_client.get('/node1')
if stat:
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
# 获取给定节点的子节点
children = zk_client.get_children('/node1')
print('node1子节点 有 %s 子节点,节点名称为: %s' % (len(children), children))
print('/ 子节点', zk_client.get_children('http://www.likecs.com/'))
# 更新节点
# 更新节点数据
zk_client.set("/node1/subNode2", b"some new data")
# 删除节点 recursive参数可选,递归删除节点数据
zk_client.delete("/node1", recursive=True)
# 重试命令
try:
result = zk_client.retry(zk_client.get, "/node1/subNode3")
print(result)
# 自定义重试
# max_tries 出错最大重试次数, ignore_expire False-重试的时候忽略会话过期,否则不忽略
kr = KazooRetry(max_tries=3, ignore_expire=False)
result = kr(zk_client.get, "/node1/subNode3")
except Exception as e:
print('/node1/subNode3 不存在,所以会运行出错')
# 释放客户端占用资源,移除连接
zk_client.stop()
# zk_client.stop() 会导致zk_client连接状态变成 LOST,进而触发线程调用函数 restart_zk_client,
# 该函数未执行完成的情况下,如果马上执行类似get,create等函数,会导致运行出错
#
while zk_conn_stat != 3:
continue
else:
i = 0
while i < 3000:
if i % 200 == 0:
time.sleep(2)
print('创建新节点')
zk_client.ensure_path('/node1')
zk_client.ensure_path('/node1/subNode2')