public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 底层存储结构-数组 */
final Object[] items;
/** 队首元素下标 */
int takeIndex;
/** 队尾元素下标 */
int putIndex;
/**队列元素总数 */
int count;
/** 重入锁 */
final ReentrantLock lock;
/** notEmpty等待条件 */
private final Condition notEmpty;
/** notFull等待条件 */
private final Condition notFull;
/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
transient Itrs itrs = null;
可以看到,ArrayBlockingQueue用来存储元素的实际上是一个数组。
再看下ArrayBlockingQueue两个重要方法的实现,put()和take():
public void put(E e) throws InterruptedException { //先检查e是否为空 checkNotNull(e); //获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //当队列已满,进入条件等待 while (count == items.length) notFull.await(); //队列不满,进行入队列操作 enqueue(e); } finally { //释放锁 lock.unlock(); } }
再看下具体的入队操作:
private void enqueue(E x) { final Object[] items = this.items; //队尾入队 items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; //队列总数+1 count++; //notempty条件的等待集中随机选择一个线程,解除其阻塞状态 notEmpty.signal(); }
下面是take()方法的源代码:
public E take() throws InterruptedException { //获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列为空 while (count == 0) //线程加入notEmpty条件等待集 notEmpty.await(); //非空,出队列 return dequeue(); } finally { //释放锁 lock.unlock(); } }
4、阻塞队列的应用:实现消费者-生产者模式/** * @author 作者:徐剑 E-mail:anxu_2013@163.com * @version 创建时间:2016年3月20日 下午2:21:55 * 类说明:阻塞队列实现的消费者-生产者模式 */ public class Test { private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume() { while (true) { try { queue.take(); System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while (true) { try { queue.put(1); System.out.println("向队列取中插入一个元素,队列剩余空间:"+ (queueSize - queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } }