public void put(E paramE)
throws InterruptedException
{
// 确保放入的元素是非空的
if (paramE == null) {
throw new NullPointerException();
}
int i = -1;
Node localNode = new Node(paramE);
ReentrantLock localReentrantLock = this.putLock;
AtomicInteger localAtomicInteger = this.count;
// 响应中断
localReentrantLock.lockInterruptibly();
try
{
while (localAtomicInteger.get() == this.capacity) {
this.notFull.await(); // 阻塞
}
enqueue(localNode);
i = localAtomicInteger.getAndIncrement();
if (i + 1 < this.capacity) {
this.notFull.signal(); ///使用了signal()提高性能, put的时候对notFull 条件队列中阻塞的线程进行唤醒。
}
}
finally
{
localReentrantLock.unlock();
}
if (i == 0) {
signalNotEmpty(); // put 的时候,会在i == 0 的情况下对 notEmpty 条件队列中阻塞的线程进行唤醒,但是这个需要获取takeLock锁。
}
}
public E take() throws InterruptedException { int i = -1; AtomicInteger localAtomicInteger = this.count; ReentrantLock localReentrantLock = this.takeLock; localReentrantLock.lockInterruptibly(); Object localObject1; try { while (localAtomicInteger.get() == 0) { this.notEmpty.await(); } localObject1 = dequeue(); i = localAtomicInteger.getAndDecrement(); if (i > 1) { this.notEmpty.signal(); } } finally { localReentrantLock.unlock(); } if (i == this.capacity) { signalNotFull(); } return (E)localObject1; }
跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号
LinkedBlockingQueue内部维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。
而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
LinkedBlockingQueue内部使用ReentrantLock实现插入锁(putLock)和取出锁(takeLock)。putLock上的条件变量是notFull,即可以用notFull唤醒阻塞在putLock上的线程。takeLock上的条件变量是notEmtpy,即可用notEmpty唤醒阻塞在takeLock上的线程。