(4)一旦队列中的后面的节点,获得前一个子节点变更通知,则开始进行判断,判断自己是否为当前子节点列表中序号最小的子节点,如果是,则认为加锁成功;如果不是,则持续监听,一直到获得锁。
(5)获取锁后,开始处理业务流程。完成业务流程后,删除自己的对应的子节点,完成释放锁的工作,以方面后继节点能捕获到节点变更通知,获得分布式锁。
实战:加锁的实现Lock接口中加锁的方法是lock()。lock()方法的大致流程是:首先尝试着去加锁,如果加锁失败就去等待,然后再重复。
1.lock()方法的实现代码lock()方法加锁的实现代码,大致如下:
package com.crazymakercircle.zk.distributedLock; import com.crazymakercircle.zk.ZKclient; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * create by 尼恩 @ 疯狂创客圈 **/ @Slf4j public class ZkLock implements Lock { //ZkLock的节点链接 private static final String ZK_PATH = "/test/lock"; private static final String LOCK_PREFIX = ZK_PATH + "http://www.likecs.com/"; private static final long WAIT_TIME = 1000; //Zk客户端 CuratorFramework client = null; private String locked_short_path = null; private String locked_path = null; private String prior_path = null; final AtomicInteger lockCount = new AtomicInteger(0); private Thread thread; public ZkLock() { ZKclient.instance.init(); synchronized (ZKclient.instance) { if (!ZKclient.instance.isNodeExist(ZK_PATH)) { ZKclient.instance.createNode(ZK_PATH, null); } } client = ZKclient.instance.getClient(); } @Override public boolean lock() { //可重入,确保同一线程,可以重复加锁 synchronized (this) { if (lockCount.get() == 0) { thread = Thread.currentThread(); lockCount.incrementAndGet(); } else { if (!thread.equals(Thread.currentThread())) { return false; } lockCount.incrementAndGet(); return true; } } try { boolean locked = false; //首先尝试着去加锁 locked = tryLock(); if (locked) { return true; } //如果加锁失败就去等待 while (!locked) { await(); //获取等待的子节点列表 List<String> waiters = getWaiters(); //判断,是否加锁成功 if (checkLocked(waiters)) { locked = true; } } return true; } catch (Exception e) { e.printStackTrace(); unlock(); } return false; } //...省略其他的方法 } 2.tryLock()尝试加锁尝试加锁的tryLock方法是关键,做了两件重要的事情:
(1)创建临时顺序节点,并且保存自己的节点路径
(2)判断是否是第一个,如果是第一个,则加锁成功。如果不是,就找到前一个Znode节点,并且保存其路径到prior_path。
尝试加锁的tryLock方法,其实现代码如下:
/** * 尝试加锁 * @return 是否加锁成功 * @throws Exception 异常 */ private boolean tryLock() throws Exception { //创建临时Znode locked_path = ZKclient.instance .createEphemeralSeqNode(LOCK_PREFIX); //然后获取所有节点 List<String> waiters = getWaiters(); if (null == locked_path) { throw new Exception("zk error"); } //取得加锁的排队编号 locked_short_path = getShortPath(locked_path); //获取等待的子节点列表,判断自己是否第一个 if (checkLocked(waiters)) { return true; } // 判断自己排第几个 int index = Collections.binarySearch(waiters, locked_short_path); if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了 throw new Exception("节点没有找到: " + locked_short_path); } //如果自己没有获得锁,则要监听前一个节点 prior_path = ZK_PATH + "http://www.likecs.com/" + waiters.get(index - 1); return false; } private String getShortPath(String locked_path) { int index = locked_path.lastIndexOf(ZK_PATH + "http://www.likecs.com/"); if (index >= 0) { index += ZK_PATH.length() + 1; return index <= locked_path.length() ? locked_path.substring(index) : ""; } return null; }创建临时顺序节点后,其完整路径存放在locked_path成员中;另外还截取了一个后缀路径,放在
locked_short_path成员中,后缀路径是一个短路径,只有完整路径的最后一层。为什么要单独保存短路径呢?
因为,在获取的远程子节点列表中的其他路径返回结果时,返回的都是短路径,都只有最后一层路径。所以为了方便后续进行比较,也把自己的短路径保存下来。