工作中常用到的Java集合类有哪些? (2)

工作中常用到的Java集合类有哪些?

工作中常用到的Java集合类有哪些?

Queue队列

不知道大家有没有学过生产者和消费者模式,秋招面试的时候可能会让你手写一段这样的代码。最简单的方式就是用阻塞队列去写。类似下面:

生产者:

import java.util.Random; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable { // true--->生产者一直执行,false--->停掉生产者 private volatile boolean isRunning = true; // 公共资源 private final Vector sharedQueue; // 公共资源的最大数量 private final int SIZE; // 生产数据 private static AtomicInteger count = new AtomicInteger(); public Producer(Vector sharedQueue, int SIZE) { this.sharedQueue = sharedQueue; this.SIZE = SIZE; } @Override public void run() { int data; Random r = new Random(); System.out.println("start producer id = " + Thread.currentThread().getId()); try { while (isRunning) { // 模拟延迟 Thread.sleep(r.nextInt(1000)); // 当队列满时阻塞等待 while (sharedQueue.size() == SIZE) { synchronized (sharedQueue) { System.out.println("Queue is full, producer " + Thread.currentThread().getId() + " is waiting, size:" + sharedQueue.size()); sharedQueue.wait(); } } // 队列不满时持续创造新元素 synchronized (sharedQueue) { // 生产数据 data = count.incrementAndGet(); sharedQueue.add(data); System.out.println("producer create data:" + data + ", size:" + sharedQueue.size()); sharedQueue.notifyAll(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupted(); } } public void stop() { isRunning = false; } }

消费者:

import java.util.Random; import java.util.Vector; public class Consumer implements Runnable { // 公共资源 private final Vector sharedQueue; public Consumer(Vector sharedQueue) { this.sharedQueue = sharedQueue; } @Override public void run() { Random r = new Random(); System.out.println("start consumer id = " + Thread.currentThread().getId()); try { while (true) { // 模拟延迟 Thread.sleep(r.nextInt(1000)); // 当队列空时阻塞等待 while (sharedQueue.isEmpty()) { synchronized (sharedQueue) { System.out.println("Queue is empty, consumer " + Thread.currentThread().getId() + " is waiting, size:" + sharedQueue.size()); sharedQueue.wait(); } } // 队列不空时持续消费元素 synchronized (sharedQueue) { System.out.println("consumer consume data:" + sharedQueue.remove(0) + ", size:" + sharedQueue.size()); sharedQueue.notifyAll(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }

Main方法测试:

import java.util.Vector; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test2 { public static void main(String[] args) throws InterruptedException { // 1.构建内存缓冲区 Vector sharedQueue = new Vector(); int size = 4; // 2.建立线程池和线程 ExecutorService service = Executors.newCachedThreadPool(); Producer prodThread1 = new Producer(sharedQueue, size); Producer prodThread2 = new Producer(sharedQueue, size); Producer prodThread3 = new Producer(sharedQueue, size); Consumer consThread1 = new Consumer(sharedQueue); Consumer consThread2 = new Consumer(sharedQueue); Consumer consThread3 = new Consumer(sharedQueue); service.execute(prodThread1); service.execute(prodThread2); service.execute(prodThread3); service.execute(consThread1); service.execute(consThread2); service.execute(consThread3); // 3.睡一会儿然后尝试停止生产者(结束循环) Thread.sleep(10 * 1000); prodThread1.stop(); prodThread2.stop(); prodThread3.stop(); // 4.再睡一会儿关闭线程池 Thread.sleep(3000); // 5.shutdown()等待任务执行完才中断线程(因为消费者一直在运行的,所以会发现程序无法结束) service.shutdown(); } }

我的项目用阻塞队列也挺多的(我觉得跟个人编写的代码风格习惯有关),类似实现了上面的生产者和消费者模式。

真实场景例子:

运营要发一条推送消息,首先需要去用户画像系统圈选一个人群,填写对应的人群ID和发送时间。

我通过时间调度,通过RPC拿到人群的信息。遍历HDFS得到这个人群的每个userId

将遍历的userId放到一个阻塞队列里边去,用多个线程while(true)取阻塞队列的数据

好处是什么?我在取userId的时候,会有个限制:要么超出了指定的时间,要么达到BatchSize的值。这样我就可以将相同内容的不同userId组成一个Task

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwxgyg.html