Zookeeper 分布式锁 (图解+秒懂+史上最全) (7)

创建了自己的临时节点后,调用checkLocked方法,判断是否是锁定成功。如果锁定成功,则返回true;如果自己没有获得锁,则要监听前一个节点,此时需要找出前一个节点的路径,并保存在
prior_path
成员中,供后面的await()等待方法去监听使用。在进入await()等待方法的介绍前,先说下checkLocked
锁定判断方法。

3.checkLocked()检查是否持有锁

在checkLocked()方法中,判断是否可以持有锁。判断规则很简单:当前创建的节点,是否在上一步获取到的子节点列表的第一个位置:

(1)如果是,说明可以持有锁,返回true,表示加锁成功;

(2)如果不是,说明有其他线程早已先持有了锁,返回false。

checkLocked()方法的代码如下:

private boolean checkLocked(List<String> waiters) { //节点按照编号,升序排列 Collections.sort(waiters); // 如果是第一个,代表自己已经获得了锁 if (locked_short_path.equals(waiters.get(0))) { log.info("成功的获取分布式锁,节点为{}", locked_short_path); return true; } return false; }

checkLocked方法比较简单,将参与排队的所有子节点列表,从小到大根据节点名称进行排序。排序主要依靠节点的编号,也就是后Znode路径的10位数字,因为前缀都是一样的。排序之后,做判断,如果自己的locked_short_path编号位置排在第一个,如果是,则代表自己已经获得了锁。如果不是,则会返回false。

如果checkLocked()为false,外层的调用方法,一般来说会执行await()等待方法,执行夺锁失败以后的等待逻辑。

4.await()监听前一个节点释放锁

await()也很简单,就是监听前一个ZNode节点(prior_path成员)的删除事件,代码如下:

private void await() throws Exception { if (null == prior_path) { throw new Exception("prior_path error"); } final CountDownLatch latch = new CountDownLatch(1); //订阅比自己次小顺序节点的删除事件 Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听到的变化 watchedEvent = " + watchedEvent); log.info("[WatchedEvent]节点删除"); latch.countDown(); } }; client.getData().usingWatcher(w).forPath(prior_path); /* //订阅比自己次小顺序节点的删除事件 TreeCache treeCache = new TreeCache(client, prior_path); TreeCacheListener l = new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if (data != null) { switch (event.getType()) { case NODE_REMOVED: log.debug("[TreeCache]节点删除, path={}, data={}", data.getPath(), data.getData()); latch.countDown(); break; default: break; } } } }; treeCache.getListenable().addListener(l); treeCache.start();*/ latch.await(WAIT_TIME, TimeUnit.SECONDS); }

首先添加一个Watcher监听,而监听的节点,正是前面所保存在prior_path成员的前一个节点的路径。这里,仅仅去监听自己前一个节点的变动,而不是其他节点的变动,提升效率。完成监听之后,调用latch.await(),线程进入等待状态,一直到线程被监听回调代码中的latch.countDown() 所唤醒,或者等待超时。

说 明

以上代码用到的CountDownLatch的核心原理和实战知识,《Netty Zookeeper Redis 高并发实战》姊妹篇 《Java高并发核心编程(卷2)》。

上面的代码中,监听前一个节点的删除,可以使用两种监听方式:

(1)Watcher 订阅;

(2)TreeCache 订阅。

两种方式的效果,都差不多。但是这里的删除事件,只需要监听一次即可,不需要反复监听,所以使用的是Watcher
一次性订阅。而TreeCache 订阅的代码在源码工程中已经被注释,仅仅供大家参考。

一旦前一个节点prior_path节点被删除,那么就将线程从等待状态唤醒,重新一轮的锁的争夺,直到获取锁,并且完成业务处理。

至此,分布式Lock加锁的算法,还差一点就介绍完成。这一点,就是实现锁的可重入。

5.可重入的实现代码

什么是可重入呢?只需要保障同一个线程进入加锁的代码,可以重复加锁成功即可。
修改前面的lock方法,在前面加上可重入的判断逻辑。代码如下:

@Override public boolean lock() { //可重入的判断 synchronized (this) { if (lockCount.get() == 0) { thread = Thread.currentThread(); lockCount.incrementAndGet(); } else { if (!thread.equals(Thread.currentThread())) { return false; } lockCount.incrementAndGet(); return true; } } //.... }

为了变成可重入,在代码中增加了一个加锁的计数器lockCount
,计算重复加锁的次数。如果是同一个线程加锁,只需要增加次数,直接返回,表示加锁成功。

至此,lock()方法已经介绍完成,接下来,就是去释放锁

实战:释放锁的实现

Lock接口中的unLock()方法,表示释放锁,释放锁主要有两个工作:

(1)减少重入锁的计数,如果最终的值不是0,直接返回,表示成功的释放了一次;

(2)如果计数器为0,移除Watchers监听器,并且删除创建的Znode临时节点。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwsydw.html