LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
BlockingDeque作为双端队列,针对头部元素,还提供了如下方法:
Throws exception Special value Blocks Times out
Insert addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit)
Remove removeFirst() pollFirst() takeFirst() pollFirst(time, unit)
Examine getFirst() peekFirst() not applicable not applicable
针对尾部元素
Last Element (Tail)Throws exception Special value Blocks Times out
Insert addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit)
Remove removeLast() pollLast() takeLast() pollLast(time, unit)
Examine getLast() peekLast() not applicable not applicable
使用示例
一个典型的生产者和消费者实例如下,一个BlockingQueue可以安全地与多个生产者和消费者一起使用,Producer线程调用NumerGenerator.getNextNumber()生成自增整数,不断地写入数字,然后Consumer循环消费。
package com.aidodoo.java.concurrent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * Created by zhangkh on 2018/7/17. */ public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue queue = new ArrayBlockingQueue(1024,true); ExecutorService executorService = Executors.newFixedThreadPool(20); for (int i = 0; i < 5; i++) { executorService.submit(new Producer(queue)); } for (int i = 0; i < 3; i++) { executorService.submit(new Consumer(queue)); } Thread.sleep(30 * 1000L); executorService.shutdown(); } } class Producer implements Runnable { Logger logger = LoggerFactory.getLogger(Producer.class.getName()); protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { for(int i=0;i<3;i++){ int num = NumerGenerator.getNextNumber(); queue.put(num); Thread.sleep(1000); logger.info("{} producer put {}", Thread.currentThread().getName(), num); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Consumer implements Runnable { Logger logger = LoggerFactory.getLogger(Consumer.class.getName()); protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { int ele = (int) queue.take(); logger.info("{} Consumer take {}", Thread.currentThread().getName(), ele); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } class NumerGenerator{ private static AtomicInteger count = new AtomicInteger(); public static Integer getNextNumber(){ return count.incrementAndGet(); } }程序输出如下:
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 1 18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 2 18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 3 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-3 producer put 3 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-2 producer put 2 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-1 producer put 1 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-5 producer put 5 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-4 producer put 4 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 4 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 5 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 6 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-3 producer put 6 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-1 producer put 8 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-2 producer put 7 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-5 producer put 9 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-4 producer put 10 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 7 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 8 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 9 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-1 producer put 12 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-3 producer put 11 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-5 producer put 14 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-4 producer put 15 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-2 producer put 13 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 10 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 11 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 12 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 13 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 14 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 15其他BlockingQueue子类的使用可参考对应的Java Api。
源码分析