Java并发之阻塞队列浅析(3)

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; }

 从上边可以明显的看出ArrayBlockingQueue用一个数组来存储数据,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。 lock是一个可重入锁,notEmpty和notFull是等待条件。

 然后看它的一个关键方法的实现:put()

public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); }
}

首选检查元素是否为空,为空则抛出异常

接着实例化可重入锁

然后localReentrantLock.lockInterruptibly();这里特别强调一下 (lockInterruptibly()允许在等待时由其他线程的Thread.interrupt()方法来中断等待线程而直接返回,这时是不用获取锁的,而会抛出一个InterruptException。  而ReentrantLock.lock()方法则不允许Thread.interrupt()中断,即使检测到了Thread.interruptted一样会继续尝试获取锁,失败则继续休眠。只是在最后获取锁成功之后在把当前线程置为中断状态)

判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,即当队列满的时候,将会等待

将元素插入到队列中

解锁(这里一定要在finally中解锁啊!!!)

enqueue(E x)将元素插入到数组啊item中

/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }

该方法内部通过putIndex索引直接将元素添加到数组items中

    这里思考一个问题 为什么当putIndex索引大小等于数组长度时,需要将putIndex重新设置为0?

   这是因为当队列是先进先出的 所以获取元素总是从队列头部获取,而添加元素从中从队列尾部获取。所以当队列索引(从0开始)与数组长度相等时,所以下次我们就需要从数组头部开始添加了;

最后当插入成功后,通过notEmpty唤醒正在等待取元素的线程

阻塞队列中和put对应的就是take了

下边是take方法的实现

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); finally { lock.unlock(); } }

take方法其实很简单,队列中有数据就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么put操作将会唤醒take线程; 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/89ef0d69516bc50721abf2d5fae64759.html