KClient——kafka消息中间件源码解读 (4)

如果线程数不大于0,使用minThreadNum,maxThreadNum进行构造线程池。

syncThreadPool = new ThreadPoolExecutor(minThreadNum, maxThreadNum, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); Executors简介

这里介绍Executors提供的四种线程池

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

ThreadPoolExecutor简介

ThreadPooExecutor与Executor的关系如下:

KClient——kafka消息中间件源码解读

构造方法:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue); ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

参数说明:

corePoolSize

核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。除非将allowCoreThreadTimeOut设置为true。

maximumPoolSize

线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。

keepAliveTime

非核心线程的闲置超时时间,超过这个时间就会被回收。

unit

指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。

workQueue

线程池中的任务队列.

常用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。

SynchronousQueue

线程工厂,提供创建新线程的功能。

RejectedExecutionHandler

当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的rejectedExecution方法。

initKafka

由于kafka API已经改动很多,所以这里关于Kafka的操作仅做参考,不会详细介绍。

1.加载Consumer配置

ConsumerConfig config = new ConsumerConfig(properties);

2.创建consumerConnector连接

consumerConnector = Consumer.createJavaConsumerConnector(config);

3.存储kafka topic与对应设置的消息流数量

Map<String, Integer> topics = new HashMap<String, Integer>(); topics.put(topic, streamNum);

4.从kafka获取消息流

Map<String, List<KafkaStream<String, String>>> streamsMap = consumerConnector .createMessageStreams(topics, keyDecoder, valueDecoder); streams = streamsMap.get(topic);

5.创建消息处理线程池

startup()

上述init()主要介绍了kafka消费者的初始化,而startup()则是kafkaConsumer作为消费者进行消费动作的核心功能代码。

1.依次处理消息线程streams中的消息

for (KafkaStream<String, String> stream : streams) {

2.创建消息任务

AbstractMessageTask abstractMessageTask = (fixedThreadNum == 0 ? new SequentialMessageTask( stream, handler) : new ConcurrentMessageTask(stream, handler, fixedThreadNum));

3.添加到tasks中,以方便关闭进程

tasks.add(abstractMessageTask);

4.执行任务

streamThreadPool.execute(abstractMessageTask); AbstractMessageTask

任务执行的抽象类,核心功能如下从消息线程池中不断获取消息,进行消费。
下面是完整代码,不再详细介绍:

abstract class AbstractMessageTask implements Runnable { protected KafkaStream<String, String> stream; protected MessageHandler messageHandler; AbstractMessageTask(KafkaStream<String, String> stream, MessageHandler messageHandler) { this.stream = stream; this.messageHandler = messageHandler; } public void run() { ConsumerIterator<String, String> it = stream.iterator(); while (status == Status.RUNNING) { boolean hasNext = false; try { // When it is interrupted if process is killed, it causes some duplicate message processing, because it commits the message in a chunk every 30 seconds hasNext = it.hasNext(); } catch (Exception e) { // hasNext() method is implemented by scala, so no checked // exception is declared, in addtion, hasNext() may throw // Interrupted exception when interrupted, so we have to // catch Exception here and then decide if it is interrupted // exception if (e instanceof InterruptedException) { log.info( "The worker [Thread ID: {}] has been interrupted when retrieving messages from kafka broker. Maybe the consumer is shutting down.", Thread.currentThread().getId()); log.error("Retrieve Interrupted: ", e); if (status != Status.RUNNING) { it.clearCurrentChunk(); shutdown(); break; } } else { log.error( "The worker [Thread ID: {}] encounters an unknown exception when retrieving messages from kafka broker. Now try again.", Thread.currentThread().getId()); log.error("Retrieve Error: ", e); continue; } } if (hasNext) { MessageAndMetadata<String, String> item = it.next(); log.debug("partition[" + item.partition() + "] offset[" + item.offset() + "] message[" + item.message() + "]"); handleMessage(item.message()); // if not auto commit, commit it manually if (!isAutoCommitOffset) { consumerConnector.commitOffsets(); } } } protected void shutdown() { // Actually it doesn't work in auto commit mode, because kafka v0.8 commits once per 30 seconds, so it is bound to consume duplicate messages. stream.clear(); } protected abstract void handleMessage(String message); } SequentialMessageTask && SequentialMessageTask

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjyzp.html