Condition实现原理

Condition接口提供了与Object阻塞(wait())与唤醒(notify()或notifyAll())相似的功能,只不过Condition接口提供了更为丰富的功能,如:限定等待时长等。Condition需要与Lock结合使用,需要通过锁对象获取Condition。

一、基本使用

基于Condition实现生产者、消费者模式。代码基本与Object#wait()和Object#notify()类似,只不过我们使用Lock替换了synchronized关键字。
生产者

public class Producer implements Runnable { private Lock lock; private Condition condition; private Queue<String> queue; private int maxSize; public Producer(Lock lock, Condition condition, Queue<String> queue, int maxSize) { this.lock = lock; this.condition = condition; this.queue = queue; this.maxSize = maxSize; } @Override public void run() { int i = 0; for (; ; ) { lock.lock(); // 如果满了,则阻塞 while (queue.size() == maxSize) { System.out.println("生产者队列满了,等待..."); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } queue.add("一个消息:" + ++i); System.out.printf("生产者%s生产了一个消息:%s\n", Thread.currentThread().getName(), i); condition.signal(); lock.unlock(); } } }

消费者

public class Consumer implements Runnable { private Lock lock; private Condition condition; private Queue<String> queue; private int maxSize; public Consumer(Lock lock, Condition condition, Queue<String> queue, int maxSize) { this.lock = lock; this.condition = condition; this.queue = queue; this.maxSize = maxSize; } @Override public void run() { for (; ; ) { lock.lock(); while (queue.isEmpty()) { System.out.println("消费者队列为空,等待..."); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } String obj = queue.remove(); System.out.printf("消费者%s消费一个消息:%s\n", Thread.currentThread().getName(), obj); condition.signal(); lock.unlock(); } } }

测试类

public class ConditionProducerConsumer { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); Queue<String> queue = new LinkedBlockingQueue<>(); int maxSize = 10; Producer producer = new Producer(lock, condition, queue, maxSize); Consumer consumer = new Consumer(lock, condition, queue, maxSize); new Thread(producer).start(); new Thread(consumer).start(); } } 二、源码分析

上述示例中使用的Lock是ReentrantLock,关于它的lock方法与unlock方法的原理详见ReentrantLock实现原理。上述示例中的Condition对象是调用了Lock#newCondition()方法,源码如下:

public class ReentrantLock implements Lock, java.io.Serializable { ... public Condition newCondition() { return sync.newCondition(); } abstract static class Sync extends AbstractQueuedSynchronizer { ... final ConditionObject newCondition() { return new ConditionObject(); } ... } ... }

上述的ConditionObject定义在AQS中,如下:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... public class ConditionObject implements Condition, java.io.Serializable { ... } ... }

首先来分析下Condition#await()方法

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }

根据AQS队列的特性,若有多个线程执行lock#lock()方法,会将处于阻塞状态的线程维护到一个双向链表中,如下:

image


假设当前是线程A获取到锁,其他线程执行lock#lock()方法时,将会构建成一个上述链表。
若获取锁的线程(线程A)执行Condition#await()方法,则会将当前线程添加至Condition队列中,如下:

image


然后在调用fullyRelease()方法时会释放当前线程的锁,然后唤醒处于阻塞队列中的下一个线程:

image


在调用isOnSyncQueue()方法时会检查当前节点是否在同步队列中,若不存在,则会调用LockSupport.park()进行阻塞。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgyxzp.html