内存缓冲区:要保证在多线程环境下内存缓冲区的安全,所以我们考虑使用简单的Vector类来作为我们的内存缓冲区,并且使用final修饰保证内存缓冲区的唯一,然后的话我们需要判断队列是否满,需要手动添加一个标识缓冲区大小的变量SIZE,注意也是final修饰;
模拟延迟:这里主要模拟的是一个网络延迟,我们首先定义了一个SLEEPTIME的延迟范围,注意使用的是static final修饰,然后使用Random()类的nextInt()方法来随机选取一个该范围内的值来模拟网络环境中的延迟;
停止方法:首先需要知道在Thread类中有一个弃用的stop()方法,我们自己增加一个标志位isRunning来完成我们自己的stop()功能,需要注意的是使用volatile来修饰,保证该标志位的可见性;
错误处理:当捕获到错误时,我们应该使用Thread类中的interrupted()方法来终止当前的进程;
消息提示:我们主要是要在控制台输出该生产者的信息,包括当前队列的状态,大小,当前线程的生产者信息等,注意的是信息格式的统一(后面的消费者同样的);
消费者代码 public class Consumer implements Runnable { private final Vector sharedQueue; // 内存缓冲区 private final int SIZE; // 缓冲区大小 private static final int SLEEPTIME = 1000; public Consumer(Vector sharedQueue, int SIZE) { this.sharedQueue = sharedQueue; this.SIZE = SIZE; } @Override public void run() { Random r = new Random(); System.out.println("start consumer id = " + Thread.currentThread().getId()); try { while (true) { // 模拟延迟 Thread.sleep(r.nextInt(SLEEPTIME)); // 当队列空时阻塞等待 while (sharedQueue.isEmpty()) { synchronized (sharedQueue) { System.out.println("Queue is empty, consumer " + Thread.currentThread().getId() + " is waiting, size:" + sharedQueue.size()); sharedQueue.wait(); } } // 队列不空时持续消费元素 synchronized (sharedQueue) { System.out.println("consumer consume data:" + sharedQueue.remove(0) + ", size:" + sharedQueue.size()); sharedQueue.notifyAll(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }跟生产者相同的,你需要注意内存缓冲区/ 模拟延迟/ 错误处理/ 消息提示这些方面的细节问题,总体来说消费者就是持续不断的消费,也比较容易实现;
主线程代码有了我们的消费者和生产者代码,我们需要来验证一下它们的正确性,照常理来说我们直接创建一些消费者和生产者的线程让它们执行就可以了啊,但是为了“加分”考虑呢,我们还是使用线程池吧..也不是特别复杂:
public static void main(String args[]) throws InterruptedException { // 1.构建内存缓冲区 Vector sharedQueue = new Vector(); int size = 4; // 2.建立线程池和线程 ExecutorService service = Executors.newCachedThreadPool(); Producer prodThread1 = new Producer(sharedQueue, size); Producer prodThread2 = new Producer(sharedQueue, size); Producer prodThread3 = new Producer(sharedQueue, size); Consumer consThread1 = new Consumer(sharedQueue, size); Consumer consThread2 = new Consumer(sharedQueue, size); Consumer consThread3 = new Consumer(sharedQueue, size); service.execute(prodThread1); service.execute(prodThread2); service.execute(prodThread3); service.execute(consThread1); service.execute(consThread2); service.execute(consThread3); // 3.睡一会儿然后尝试停止生产者 Thread.sleep(10 * 1000); prodThread1.stop(); prodThread2.stop(); prodThread3.stop(); // 4.再睡一会儿关闭线程池 Thread.sleep(3000); service.shutdown(); }大家可以自行去看看运行的结果,是没有问题的,不会出现多生产或者多消费之类的多线程问题,运行一段时间等生产者都停止之后,我们可以观察到控制台三个消费者都在等待数据的情况:
Queue is empty, consumer 17 is waiting, size:0 Queue is empty, consumer 15 is waiting, size:0 Queue is empty, consumer 16 is waiting, size:0 BlockingQueue阻塞队列方式实现该方式对比起上面一种方式实现起来要简单一些,因为不需要手动的去通知其他线程了,生产者直接往队列中放数据直到队列满,消费者直接从队列中获取数据直到队列空,BlockingQueue会自动帮我们完成阻塞这个动作,还是先来看看代码
生产者代码 public class Producer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue<Integer> queue; // 内存缓冲区 private static AtomicInteger count = new AtomicInteger(); // 总数,原子操作 private static final int SLEEPTIME = 1000; public Producer(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { int data; Random r = new Random(); System.out.println("start producer id = " + Thread.currentThread().getId()); try { while (isRunning) { // 模拟延迟 Thread.sleep(r.nextInt(SLEEPTIME)); // 往阻塞队列中添加数据 data = count.incrementAndGet(); // 构造任务数据 System.out.println("producer " + Thread.currentThread().getId() + " create data:" + data + ", size:" + queue.size()); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.err.println("failed to put data:" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupted(); } } public void stop() { isRunning = false; } }