Java线程中生产者与消费者的问题

  生产者与消费者问题是一个金典的多线程协作的问题.生产者负责生产产品,并将产品存放到仓库;消费者从仓库中获取产品并消费。当仓库满时,生产者必须停止生产,直到仓库有位置存放产品;当仓库空时,消费者必须停止消费,直到仓库中有产品。

  解决生产者/消费者问题主要用到如下几个技术:1.用线程模拟生产者,在run方法中不断地往仓库中存放产品。2.用线程模拟消费者,在run方法中不断地从仓库中获取产品。3.仓库类保存产品,当产品数量为0时,调用wait方法,使得当前消费者线程进入等待状态,当有新产品存入时,调用notify方法,唤醒等待的消费者线程。当仓库满时,调用wait方法,使得当前生产者线程进入等待状态,当有消费者获取产品时,调用notify方法,唤醒等待的生产者线程。

二、实例

package book.thread.product;

public class Consumer extends Thread{
  private Warehouse warehouse;//消费者获取产品的仓库
   private boolean running = false;//是否需要结束线程的标志位
   public Consumer(Warehouse warehouse,String name){
    super(name);
    this.warehouse = warehouse;
  }
  public void start(){
    this.running = true;
    super.start();
  }
  public void run(){
    Product product;
    try {
      while(running){
        //从仓库中获取产品
         product = warehouse.getProduct();
        sleep(500);
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  //停止消费者线程
   public void stopConsumer(){
    synchronized(warehouse){
      this.running = false;
      warehouse.notifyAll();//通知等待仓库的线程
     }
  }
  //消费者线程是否在运行
   public boolean isRunning(){
    return running;
  }
}

package book.thread.product;

public class Producer extends Thread{
   private Warehouse warehouse;//生产者存储产品的仓库
   private static int produceName = 0;//产品的名字
   private boolean running = false;//是否需要结束线程的标志位

  public Producer(Warehouse warehouse,String name){
    super(name);
    this.warehouse = warehouse;
  }
  public void start(){
    this.running = true;
    super.start();
  }
  public void run(){
    Product product;
    //生产并存储产品
     try {
    while(running){
      product = new Product((++produceName)+"");
      this.warehouse.storageProduct(product);
      sleep(300);
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  //停止生产者线程
   public void stopProducer(){
    synchronized(warehouse){
      this.running = false;
      //通知等待仓库的线程
       warehouse.notifyAll();
    }
  }
  //生产者线程是否在运行
   public boolean isRunning(){
    return running;
  }
}

package book.thread.product;

public class Product {
  private String name;//产品名
   public Product(String name){
    this.name = name;
  }
  public String toString(){
    return "Product-"+name;
  }
}

package book.thread.product;

//产品的仓库类,内部采用数组来表示循环队列,以存放产品
public class Warehouse {
  private static int CAPACITY = 11;//仓库的容量
   private Product[] products;//仓库里的产品
   //[front,rear]区间的产品未被消费
   private int front = 0;//当前仓库中第一个未被消费的产品的下标
   private int rear = 0;//仓库中最后一个未被消费的产品下标加1
  public Warehouse(){
    this.products = new Product[CAPACITY];
  }
  public Warehouse(int capacity){
    this();
    if(capacity > 0){
      CAPACITY = capacity +1;
      this.products = new Product[CAPACITY];
    }
  }

  //从仓库获取一个产品
   public Product getProduct() throws InterruptedException{
    synchronized(this){
      boolean consumerRunning = true;//标志消费者线程是否还在运行
       Thread currentThread = Thread.currentThread();//获取当前线程
       if(currentThread instanceof Consumer){
        consumerRunning = ((Consumer)currentThread).isRunning();
      }else{
        return null;//非消费者不能获取产品
       }
      //若消费者线程在运行中,但仓库中没有产品了,则消费者线程继续等待
       while((front==rear) && consumerRunning){
        wait();
        consumerRunning = ((Consumer)currentThread).isRunning();
      }
      //如果消费者线程已经停止运行,则退出该方法,取消获取产品
       if(!consumerRunning){
        return null;
      }
      //获取当前未被消费的第一个产品
       Product product = products[front];
      System.out.println("Consumer[" + currentThread.getName()+"] getProduct:"+product);
      //将当前未被消费产品的下标后移一位,如果到了数组末尾,则移动到首部
       front = (front+1+CAPACITY)%CAPACITY;
      System.out.println("仓库中还没有被消费的产品数量:"+(rear+CAPACITY-front)%CAPACITY);
      //通知其他等待线程
       notify();
      return product;
    }
  }
  //向仓库存储一个产品
   public void storageProduct(Product product) throws InterruptedException{
  synchronized(this){
    boolean producerRunning = true;//标志生产者线程是否在运行
     Thread currentThread = Thread.currentThread();
    if(currentThread instanceof Producer){
      producerRunning = ((Producer)currentThread).isRunning();
    }else{
      return;
    }
    //如果最后一个未被消费的产品与第一个未被消费的产品的下标紧挨着,则说明没有存储空间了。
     //如果没有存储空间了,而生产者线程还在运行,则生产者线程等待仓库释放产品
     while(((rear+1)%CAPACITY == front) && producerRunning){
      wait();
      producerRunning = ((Producer)currentThread).isRunning();
    }
    //如果生产线程已经停止运行了,则停止产品的存储
     if(!producerRunning){
      return;
    }
    //保存产品到仓库
     products[rear] = product;
    System.out.println("Producer[" + Thread.currentThread().getName()+"] storageProduct:" + product);
    //将rear下标循环后移一位
     rear = (rear + 1)%CAPACITY;
    System.out.println("仓库中还没有被消费的产品数量:"+(rear + CAPACITY -front)%CAPACITY);
    notify();
    }
  }
}

package book.thread.product;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/84858404d5126ce1a7f440b29dbe828e.html