Java并发之阻塞队列浅析(4)

可以看到take的实现跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。(等的就是上文的put中的信号)当数组的数量为空时,也就是无任何数据可以被取出来的时候,notEmpty这个Condition就会进行阻塞,直到被notEmpty唤醒

dequeue的实现如下

private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }

take方法主要是从队列头部取元素,可以看到takeIndex是取元素的时候的偏移值,而put中是putIndex控制添加元素的偏移量,由此可见,put和take操作的偏移量分别是由putIndex和takeIndex控制的。其实仔细观察put和take的实现思路是有很多相似之处。

offer(E o, long timeout, TimeUnit unit)的实现方式其实和put的思想是差不多的区别是 offer在阻塞的时候调用的不是await()方法而是awaitNanos(long nanosTimeout) 带超时响应的等待(PS:具体区别可以参考我之前写的关于锁的博客《JAVA并发之锁的使用浅析》)

poll(long timeout, TimeUnit unit)的实现也是这样在take的基础上加了超时响应。感兴趣的朋友可以自行去看一下

案例分析

模拟食堂的经历,食堂窗口端出一道菜放在台面,然后等待顾客消费。写到代码里就是食堂窗口就是一个生产者线程,顾客就是消费者线程,台面就是阻塞队列。

public class TestBlockingQueue {
  /**
  * 生产和消费业务操作
  *
  *
  */
  protected class WorkDesk {
 
    BlockingQueue<String> desk = new LinkedBlockingQueue<String>(8);
 
    public void work() throws InterruptedException {
      Thread.sleep(1000);
      desk.put("端出一道菜");
    }
 
    public String eat() throws InterruptedException {
      Thread.sleep(4000);
      return desk.take();
    }
 
  }
 
  /**
  * 生产者类
  *
  *
  */
  class Producer implements Runnable {
 
    private String producerName;
    private WorkDesk workDesk;
 
    public Producer(String producerName, WorkDesk workDesk) {
      this.producerName = producerName;
      this.workDesk = workDesk;
    }
 
    @Override
    public void run() {
      try {
        for (;;) {
 
          workDesk.work();
          System.out.println(producerName + "端出一道菜" +",Data:"+System.currentTimeMillis());
 
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
 
  /**
  * 消费者类
  *
  *
  */
  class Consumer implements Runnable {
    private String consumerName;
    private WorkDesk workDesk;
 
    public Consumer(String consumerName, WorkDesk workDesk) {
      this.consumerName = consumerName;
      this.workDesk = workDesk;
    }
 
    @Override
    public void run() {
      try {
        for (;;) {
          workDesk.eat();
          System.out.println(consumerName + "端走了一个菜"+",Data:"+System.currentTimeMillis());
 
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
 
  public static void main(String args[]) throws InterruptedException {
    TestBlockingQueue testQueue = new TestBlockingQueue();
    WorkDesk workDesk = testQueue.new WorkDesk();
 
    ExecutorService service = Executors.newFixedThreadPool(6);
    //四个生产者线程
    for (int i=1;i<=4;++i) {
      service.submit(testQueue.new Producer("食堂窗口-"+ i+"-", workDesk));
    }
 
    //两个消费者线程
    Consumer consumer1 = testQueue.new Consumer("顾客-1-", workDesk);
    Consumer consumer2 = testQueue.new Consumer("顾客-2-", workDesk);
 
    service.submit(consumer1);
    service.submit(consumer2);
    service.shutdown();
  }
 
}

结果部分如下

Java并发之阻塞队列浅析

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/89ef0d69516bc50721abf2d5fae64759.html