Java并发基础-并发工具类(二) (3)

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
BlockingDeque作为双端队列,针对头部元素,还提供了如下方法:

First Element (Head)  
  Throws exception   Special value   Blocks   Times out  
Insert   addFirst(e)   offerFirst(e)   putFirst(e)   offerFirst(e, time, unit)  
Remove   removeFirst()   pollFirst()   takeFirst()   pollFirst(time, unit)  
Examine   getFirst()   peekFirst()   not applicable   not applicable  

针对尾部元素

Last Element (Tail)  
  Throws exception   Special value   Blocks   Times out  
Insert   addLast(e)   offerLast(e)   putLast(e)   offerLast(e, time, unit)  
Remove   removeLast()   pollLast()   takeLast()   pollLast(time, unit)  
Examine   getLast()   peekLast()   not applicable   not applicable  
使用示例

一个典型的生产者和消费者实例如下,一个BlockingQueue可以安全地与多个生产者和消费者一起使用,Producer线程调用NumerGenerator.getNextNumber()生成自增整数,不断地写入数字,然后Consumer循环消费。

package com.aidodoo.java.concurrent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * Created by zhangkh on 2018/7/17. */ public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue queue = new ArrayBlockingQueue(1024,true); ExecutorService executorService = Executors.newFixedThreadPool(20); for (int i = 0; i < 5; i++) { executorService.submit(new Producer(queue)); } for (int i = 0; i < 3; i++) { executorService.submit(new Consumer(queue)); } Thread.sleep(30 * 1000L); executorService.shutdown(); } } class Producer implements Runnable { Logger logger = LoggerFactory.getLogger(Producer.class.getName()); protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { for(int i=0;i<3;i++){ int num = NumerGenerator.getNextNumber(); queue.put(num); Thread.sleep(1000); logger.info("{} producer put {}", Thread.currentThread().getName(), num); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Consumer implements Runnable { Logger logger = LoggerFactory.getLogger(Consumer.class.getName()); protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { int ele = (int) queue.take(); logger.info("{} Consumer take {}", Thread.currentThread().getName(), ele); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } class NumerGenerator{ private static AtomicInteger count = new AtomicInteger(); public static Integer getNextNumber(){ return count.incrementAndGet(); } }

程序输出如下:

18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 1 18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 2 18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 3 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-3 producer put 3 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-2 producer put 2 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-1 producer put 1 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-5 producer put 5 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-4 producer put 4 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 4 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 5 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 6 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-3 producer put 6 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-1 producer put 8 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-2 producer put 7 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-5 producer put 9 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-4 producer put 10 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 7 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 8 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 9 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-1 producer put 12 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-3 producer put 11 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-5 producer put 14 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-4 producer put 15 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-2 producer put 13 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 10 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 11 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 12 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 13 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 14 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 15

其他BlockingQueue子类的使用可参考对应的Java Api。

源码分析

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydzdg.html