Java并发基础-并发工具类(二) (4)

由于BlockingQueue相关的子类众多,我们仅以ArrayBlockingQueue从源码角度分析相关实现。
构造方法
ArrayBlockingQueue中定义的成员变量如下:

final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; transient Itrs itrs = null

各变量的解释如下,以便了解后续的代码:

items用于存储具体的元素

takeIndex元素索引,用于记录下次获取元素的位置

putIndex元素索引,用于记录下次插入元素的位置

count用于记录当前队列中元素的个数

notEmpty条件变量,此处为获取元素的条件,即队列不能为空,否则线程阻塞

notFull条件变量,此处为插入元素的条件,即队列不能已满,否则线程阻塞

itrs用于维护迭代器相关内容

内部结构如下:

构造方法如下:

public ArrayBlockingQueue(int capacity) { this(capacity, false); //(1) } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //(2) lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); //(3) notFull = lock.newCondition(); //(4) } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { //(5) checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }

(1):默认情况下,非公平模式,即抢占式
(2):数组初始化
(3)/(4):条件变量初始化
(5):如果构造方法中,含有初始化集合的话,则将对应元素添加到内部数组,并更改count和putIndex的值。

插入数据
插入数据,我们主要看put()方法的实现,重点看生产者和消费者插入和获取数据时,线程何时阻塞,同时又何时唤醒。

public void put(E e) throws InterruptedException { checkNotNull(e); //(1) final ReentrantLock lock = this.lock; //(2) lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //(3) enqueue(e); } finally { lock.unlock(); //(4) } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; //(5) if (++putIndex == items.length) //(6) putIndex = 0; count++; //(7) notEmpty.signal(); //(8) }

(1):非空检查,插入的元素不能为null,否则抛出NullPointerException
(2):获取互斥锁
(3):如果当前队列的元素个数等于队列总长度,即队列已满,则通过条件变量,释放和notFull相关的锁,当前线程阻塞。当前线程唤醒的条件如下:

其他某个线程调用此 Condition 的 signal() 方法,并且碰巧将当前线程选为被唤醒的线程;

或者其他某个线程调用此 Condition 的 signalAll() 方法;

或者其他某个线程中断当前线程,且支持中断线程的挂起;

或者发生“虚假唤醒”

(5):如果队列未满,则将元素添加的putIndex索引的位置
(6):putIndex增加1后和队列长度相等,即已到达队列尾部,则putIndex置0
(7):队列已有元素数量加1
(8):通知notEmpty条件变量,唤醒等待获取元素的线程
(4):释放互斥锁
可以看到ArrayBlockingQueue每次插入元素后,都会去唤醒等待获取元素的线程。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydzdg.html