主类测试
public class ConsumeAndProduce{ public static void main(String[] args) { Producer pro = new Producer(); Consumer con = new Consumer(); Thread t1 = new Thread(pro); Thread t2 = new Thread(con); t1.start(); t2.start(); } }
总结
empty和full两个信号量的作用就在于消费者消费之前检查是否有待消费商品,如果有则让他去消费,没有就要将消费者进程放进等待队列,等到生产者生产了商品后又将其从等待队列中取出。生产者在生产之前需要先检查是否有足够的缓冲区(存放商品的地方),如果有则让其去生产,没有的话就要进入等待队列等待消费者消费缓冲区的商品。
多生产者多消费者(PV操作解决互斥问题)因为有多个生产者和消费者来生产和消费商品,我们需要在Global中加入两个变量pCount、cCount,分别用来表示生产的商品的序号和被消费的商品的序号,之前是因为只有单个生产消费着,所以直接将其定义在run方法中即可,但这是有多个生产者和消费者,所以要放到一个公共区,一起去操作它。但是如果我们有多个生产者和多个消费者,会不会出现线程安全问题?答案是肯定的。生产者重复生产了同一商品
这种情况如何出现的呢,我们先看run方法里的代码
@Override public void run() { while(Global.pCount < 20) { //最多生产20件商品 Global.empty.Wait(); //临界区 int index = Global.pCount % 2; Global.buffer[index] = Global.pCount; System.out.println(Thread.currentThread().getName()+"生产者在缓冲区"+index+"中生产了物品"+Global.pCount); Global.pCount++; try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } Global.full.Signal(); } }假如生产者1号生产了0号商品,但此时他还没做Global.pCount++这一步操作,CPU将执行权切换到生产者2号,这时Global.pCount的值还是刚刚的0,没有加1,所以又会生产出一个0号商品,那消费者也同理,消费完还没加1,就被切换了执行权。
那就有个问题,如果我们将Global.pCount++这一步提前能不能解决问题呢,当然也是不行的,因为可能++完还没输出就被切换执行权,那下次执行权回来时候就会继续执行输出操作,但此时的Global.pCount的值已经不知道加了多少了。
解决方法解决的办法就是加入新的信号量Mutex,将初始值设为1,引起多个生产者之间的互斥,或者多个消费者之间的互斥,即1号生产者操作pCount这个数据的时候,其他生产者无法对pCount进行操作。就是我们说的信号量的第一个应用,解决互斥问题。
package OS; /** * 封装的PV操作类 * @author Vfdxvffd * @count 信号量 */ class syn{ int count = 0; syn(){} syn(int a){count = a;} public synchronized void Wait() { count--; if(count < 0) { //block try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } public synchronized void Signal() { count++; if(count <= 0) { //wakeup notify(); } } } class Global{ static syn empty = new syn(2); //成员变量count表示剩余空闲缓冲区的数量 >0则生产者进程可以执行 static syn full = new syn(0); //成员变量count表示当前待消费物品的数量 >0则消费者进程可以执行 static syn pMutex = new syn(1); //保证生产者之间互斥的信号量 static syn cMutex = new syn(1); //保证消费者之间互斥的信号量 static int[] buffer = new int[2];//缓冲区,就像放面包的盘子 static int pCount = 0; //生产者生产的商品编号 static int cCount = 0; //消费者消费的商品编号 } /** * 生产者类 * @author Vfdxvffd * @count 生产的物品数量标号 * Global.empty.Wait();和Global.pMutex.Wait();的顺序无所谓, * 只要和下面对应即可,要么都包裹在里面,要么都露在外面 */ class Producer implements Runnable{ @Override public void run() { while(Global.pCount < 20) { //最多生产20件商品 Global.empty.Wait(); /*要生产物品了,给剩余空 闲缓冲区数量--,如果减完后变为负数,则说明当前没有 空闲缓冲区,则加入等待队列*/ Global.pMutex.Wait(); /*保证生产者之间的互斥, 就像是加了一个锁一样this.lock()*/ //临界区 int index = Global.pCount % 2; Global.buffer[index] = Global.pCount; System.out.println(Thread.currentThread().getName()+"生产者在缓冲区"+index+"中生产了物品"+Global.pCount); Global.pCount++; try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } Global.pMutex.Signal();//相当于释放锁this.unlock() Global.full.Signal();/*发出一个信号,表示缓冲区已 经有物品了,可以来消费了,成员变量count的值表示缓冲 区的待消费物品的数量,相当于唤醒消费者*/ } } } /** * 消费者类 * @author Vfdxvffd * @count 物品数量标号 * Global.full.Wait();和Global.cMutex.Wait();的顺序无所谓, * 只要和下面对应即可,要么都包裹在里面,要么都露在外面 */ class Consumer implements Runnable{ @Override public void run() { while(Global.cCount < 20) { Global.full.Wait(); /*要消费物品了,给当前待消费 物品--,如果减完为负数,则说明当前没有可消费物品, 加入等待队列*/ Global.cMutex.Wait();//保证消费者之间的互斥 //临界区 int index = Global.cCount % 2; int value = Global.buffer[index]; System.out.println(Thread.currentThread().getName()+"消费者在缓冲区"+index+"中消费了物品"+value); Global.cCount++; try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } Global.cMutex.Signal(); Global.empty.Signal(); /*消费完一个物品后,释放 一个缓冲区,给空闲缓冲区数量++*/ } } } public class ConsumeAndProduce{ public static void main(String[] args) { Producer pro = new Producer(); Consumer con = new Consumer(); Thread t1 = new Thread(pro); Thread t2 = new Thread(con); Thread t3 = new Thread(pro); Thread t4 = new Thread(con); t1.start(); t2.start(); t3.start(); t4.start(); } }