可以看到take的实现跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。(等的就是上文的put中的信号)当数组的数量为空时,也就是无任何数据可以被取出来的时候,notEmpty这个Condition就会进行阻塞,直到被notEmpty唤醒
dequeue的实现如下
private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
take方法主要是从队列头部取元素,可以看到takeIndex是取元素的时候的偏移值,而put中是putIndex控制添加元素的偏移量,由此可见,put和take操作的偏移量分别是由putIndex和takeIndex控制的。其实仔细观察put和take的实现思路是有很多相似之处。
offer(E o, long timeout, TimeUnit unit)的实现方式其实和put的思想是差不多的区别是 offer在阻塞的时候调用的不是await()方法而是awaitNanos(long nanosTimeout) 带超时响应的等待(PS:具体区别可以参考我之前写的关于锁的博客《JAVA并发之锁的使用浅析》)
poll(long timeout, TimeUnit unit)的实现也是这样在take的基础上加了超时响应。感兴趣的朋友可以自行去看一下
案例分析模拟食堂的经历,食堂窗口端出一道菜放在台面,然后等待顾客消费。写到代码里就是食堂窗口就是一个生产者线程,顾客就是消费者线程,台面就是阻塞队列。
public class TestBlockingQueue {
/**
* 生产和消费业务操作
*
*
*/
protected class WorkDesk {
BlockingQueue<String> desk = new LinkedBlockingQueue<String>(8);
public void work() throws InterruptedException {
Thread.sleep(1000);
desk.put("端出一道菜");
}
public String eat() throws InterruptedException {
Thread.sleep(4000);
return desk.take();
}
}
/**
* 生产者类
*
*
*/
class Producer implements Runnable {
private String producerName;
private WorkDesk workDesk;
public Producer(String producerName, WorkDesk workDesk) {
this.producerName = producerName;
this.workDesk = workDesk;
}
@Override
public void run() {
try {
for (;;) {
workDesk.work();
System.out.println(producerName + "端出一道菜" +",Data:"+System.currentTimeMillis());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消费者类
*
*
*/
class Consumer implements Runnable {
private String consumerName;
private WorkDesk workDesk;
public Consumer(String consumerName, WorkDesk workDesk) {
this.consumerName = consumerName;
this.workDesk = workDesk;
}
@Override
public void run() {
try {
for (;;) {
workDesk.eat();
System.out.println(consumerName + "端走了一个菜"+",Data:"+System.currentTimeMillis());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws InterruptedException {
TestBlockingQueue testQueue = new TestBlockingQueue();
WorkDesk workDesk = testQueue.new WorkDesk();
ExecutorService service = Executors.newFixedThreadPool(6);
//四个生产者线程
for (int i=1;i<=4;++i) {
service.submit(testQueue.new Producer("食堂窗口-"+ i+"-", workDesk));
}
//两个消费者线程
Consumer consumer1 = testQueue.new Consumer("顾客-1-", workDesk);
Consumer consumer2 = testQueue.new Consumer("顾客-2-", workDesk);
service.submit(consumer1);
service.submit(consumer2);
service.shutdown();
}
}
结果部分如下