并发条件队列之Condition 精讲 (2)

       例如,假设我们有一个有界缓冲区,它支持put和take方法。 如果尝试在空缓冲区上进行take ,则线程将阻塞,直到有可用项为止。 如果尝试在完整的缓冲区上进行put ,则线程将阻塞,直到有可用空间为止。 我们希望继续等待put线程,并在单独的等待集中take线程,以便我们可以使用仅当缓冲区中的项目或空间可用时才通知单个线程的优化。 这可以使用两个Condition实例来实现一个典型的生产者-消费者模型。这里在同一个lock锁上,创建了两个条件队列fullCondition, notFullCondition。当队列已满,没有存储空间时,put方法在notFull条件上等待,直到队列不是满的;当队列空了,没有数据可读时,take方法在notEmpty条件上等待,直到队列不为空,而notEmpty.signal()和notFull.signal()则用来唤醒等待在这个条件上的线程。

public class BoundedQueue { /** * 生产者容器 */ private LinkedList<Object> buffer; /** * 容器最大值是多少 */ private int maxSize; /** * 锁 */ private Lock lock; /** * 满了 */ private Condition fullCondition; /** * 不满 */ private Condition notFullCondition; BoundedQueue(int maxSize) { this.maxSize = maxSize; buffer = new LinkedList<Object>(); lock = new ReentrantLock(); fullCondition = lock.newCondition(); notFullCondition = lock.newCondition(); } /** * 生产者 * * @param obj * @throws InterruptedException */ public void put(Object obj) throws InterruptedException { //获取锁 lock.lock(); try { while (maxSize == buffer.size()) { System.out.println(Thread.currentThread().getName() + "此时队列满了,添加的线程进入等待状态"); // 队列满了,添加的线程进入等待状态 notFullCondition.await(); } buffer.add(obj); //通知 fullCondition.signal(); } finally { lock.unlock(); } } /** * 消费者 * * @return * @throws InterruptedException */ public Object take() throws InterruptedException { Object obj; lock.lock(); try { while (buffer.size() == 0) { System.out.println(Thread.currentThread().getName() + "此时队列空了线程进入等待状态"); // 队列空了线程进入等待状态 fullCondition.await(); } obj = buffer.poll(); //通知 notFullCondition.signal(); } finally { lock.unlock(); } return obj; } public static void main(String[] args) { // 初始化最大能放2个元素的队列 BoundedQueue boundedQueue = new BoundedQueue(2); for (int i = 0; i < 3; i++) { Thread thread = new Thread(() -> { try { boundedQueue.put("元素"); System.out.println(Thread.currentThread().getName() + "生产了元素"); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.setName("线程" + i); thread.start(); } for (int i = 0; i < 3; i++) { Thread thread = new Thread(() -> { try { boundedQueue.take(); System.out.println(Thread.currentThread().getName() + "消费了元素"); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.setName("线程" + i); thread.start(); } try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }

输出结果:

图片: https://uploader.shimo.im/f/ZbYdtEOSIvSGoInd.png

5. 源码分析

Condition接口中的方法

1. await()

       实现可中断条件等待,其实我们以上案例是利用ReentrantLock来实现的生产者消费者案例,进去看源码发现其实实现该方法的是 AbstractQueuedSynchronizer 中ConditionObject实现的
       将节点添加进同步队列中,并要么立即唤醒线程,要么等待前驱节点释放锁后将自己唤醒,无论怎样,被唤醒的线程要从哪里恢复执行呢?调用了await方法的地方

中断模式interruptMode这个变量记录中断事件,该变量有三个值:

0 : 代表整个过程中一直没有中断发生。

THROW_IE : 表示退出await()方法时需要抛出InterruptedException,这种模式对应于中断发生在signal之前

REINTERRUPT : 表示退出await()方法时只需要再自我中断以下,这种模式对应于中断发生在signal之后。

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加节点到条件队列中 Node node = addConditionWaiter(); // 释放当前线程所占用的锁,保存当前的锁状态 int savedState = fullyRelease(node); int interruptMode = 0; // 如果当前队列不在同步队列中,说明刚刚被await, 还没有人调用signal方法, // 则直接将当前线程挂起 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 线程挂起的地方 // 线程将在这里被挂起,停止运行 // 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了 // 所以检查下线程被唤醒的原因,如果是因为中断被唤醒,则跳出while循环 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 线程将在同步队列中利用进行acquireQueued方法进行“阻塞式”争锁, // 抢到锁就返回,抢不到锁就继续被挂起。因此,当await()方法返回时, // 必然是保证了当前线程已经持有了lock锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssfjf.html