☕【Java深层系列】「并发编程系列」让我们一起探索一下CyclicBarrier的技术原理和源码分析 (3)

打破平衡,并设置打破平衡的标志,然后再唤醒所有被阻塞的线程;

/** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; // 设置打破平衡的标志 count = parties; // 重新还原count为初始值 trip.signalAll(); // 发送信号量,唤醒所有Condition中的等待队列 } nextGeneration()

唤醒所有在Condition中等待的队列,然后还原初始状态值,并且重新换掉generation的引用,改朝换代,为下一轮操作做准备;

/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } AQS的await()

CyclicBarrier的成员属性 trip( Condition类型 ) 对象的方法:

该AQS的await方法,因为该方法涉及到为什么用了独占锁lock.lock之后,dowait方法里面通过调用了trip.await()进行阻塞的话,第二个、第三个线程怎么还会通过lock.lock调用之后还能进入临界区呢。

AQS的方法会调用fullyRelease(node)释放当前线程占有的锁,所以lock.lock才不至于一直被阻塞在那里;

并且Condition也维护了自己的一个链表,凡是通过调用trip.await()方法的线程,都会首先进入Condition的队列,然后释放独占锁,想办法调用park方法锁住当前线程;

然后在被信号量通知的时候,又会将Condition队列的结点转移到AQS的同步队列中,然后等待调用unlock逐个释放锁;

/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 将当前线程包装一下,然后添加到Condition自己维护的链表队列中 int savedState = fullyRelease(node); // 释放当前线程占有的锁,如果不释放的话,那么在第二次调用lock.lock()的地方; // 如果第一个没执行完的话,那么则会一直阻塞等待,那么也就无法完成栅栏的功能了。 int interruptMode = 0; while (!isOnSyncQueue(node)) { // 是否在AQS的队列中 LockSupport.park(this); // 如果不在AQS队列中的话,则阻塞等待,这里才是最最最核心阻塞的地方 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 如果在AQS队列中的话,那么则考虑重入锁,重新竞争锁,重新休息 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } CyclicBarrier的实战用法 CyclicBarrier提供2个构造器: public CyclicBarrier(int parties, Runnable barrierAction) {} public CyclicBarrier(int parties) {}

parties:指让多少个线程或者任务等待至barrier状态;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzjpxx.html