由于BlockingQueue相关的子类众多,我们仅以ArrayBlockingQueue从源码角度分析相关实现。
构造方法
ArrayBlockingQueue中定义的成员变量如下:
各变量的解释如下,以便了解后续的代码:
items用于存储具体的元素
takeIndex元素索引,用于记录下次获取元素的位置
putIndex元素索引,用于记录下次插入元素的位置
count用于记录当前队列中元素的个数
notEmpty条件变量,此处为获取元素的条件,即队列不能为空,否则线程阻塞
notFull条件变量,此处为插入元素的条件,即队列不能已满,否则线程阻塞
itrs用于维护迭代器相关内容
内部结构如下:
构造方法如下:
public ArrayBlockingQueue(int capacity) { this(capacity, false); //(1) } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //(2) lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); //(3) notFull = lock.newCondition(); //(4) } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { //(5) checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }(1):默认情况下,非公平模式,即抢占式
(2):数组初始化
(3)/(4):条件变量初始化
(5):如果构造方法中,含有初始化集合的话,则将对应元素添加到内部数组,并更改count和putIndex的值。
插入数据
插入数据,我们主要看put()方法的实现,重点看生产者和消费者插入和获取数据时,线程何时阻塞,同时又何时唤醒。
(1):非空检查,插入的元素不能为null,否则抛出NullPointerException
(2):获取互斥锁
(3):如果当前队列的元素个数等于队列总长度,即队列已满,则通过条件变量,释放和notFull相关的锁,当前线程阻塞。当前线程唤醒的条件如下:
其他某个线程调用此 Condition 的 signal() 方法,并且碰巧将当前线程选为被唤醒的线程;
或者其他某个线程调用此 Condition 的 signalAll() 方法;
或者其他某个线程中断当前线程,且支持中断线程的挂起;
或者发生“虚假唤醒”
(5):如果队列未满,则将元素添加的putIndex索引的位置
(6):putIndex增加1后和队列长度相等,即已到达队列尾部,则putIndex置0
(7):队列已有元素数量加1
(8):通知notEmpty条件变量,唤醒等待获取元素的线程
(4):释放互斥锁
可以看到ArrayBlockingQueue每次插入元素后,都会去唤醒等待获取元素的线程。