如果线程数不大于0,使用minThreadNum,maxThreadNum进行构造线程池。
syncThreadPool = new ThreadPoolExecutor(minThreadNum, maxThreadNum, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); Executors简介这里介绍Executors提供的四种线程池
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
ThreadPoolExecutor简介ThreadPooExecutor与Executor的关系如下:
构造方法:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue); ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)参数说明:
corePoolSize
核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。除非将allowCoreThreadTimeOut设置为true。
maximumPoolSize
线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。
keepAliveTime
非核心线程的闲置超时时间,超过这个时间就会被回收。
unit
指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。
workQueue
线程池中的任务队列.
常用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。
SynchronousQueue
线程工厂,提供创建新线程的功能。
RejectedExecutionHandler
当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的rejectedExecution方法。
initKafka由于kafka API已经改动很多,所以这里关于Kafka的操作仅做参考,不会详细介绍。
1.加载Consumer配置
ConsumerConfig config = new ConsumerConfig(properties);2.创建consumerConnector连接
consumerConnector = Consumer.createJavaConsumerConnector(config);3.存储kafka topic与对应设置的消息流数量
Map<String, Integer> topics = new HashMap<String, Integer>(); topics.put(topic, streamNum);4.从kafka获取消息流
Map<String, List<KafkaStream<String, String>>> streamsMap = consumerConnector .createMessageStreams(topics, keyDecoder, valueDecoder); streams = streamsMap.get(topic);5.创建消息处理线程池
startup()上述init()主要介绍了kafka消费者的初始化,而startup()则是kafkaConsumer作为消费者进行消费动作的核心功能代码。
1.依次处理消息线程streams中的消息
for (KafkaStream<String, String> stream : streams) {2.创建消息任务
AbstractMessageTask abstractMessageTask = (fixedThreadNum == 0 ? new SequentialMessageTask( stream, handler) : new ConcurrentMessageTask(stream, handler, fixedThreadNum));3.添加到tasks中,以方便关闭进程
tasks.add(abstractMessageTask);4.执行任务
streamThreadPool.execute(abstractMessageTask); AbstractMessageTask任务执行的抽象类,核心功能如下从消息线程池中不断获取消息,进行消费。
下面是完整代码,不再详细介绍: