KClient——kafka消息中间件源码解读 (3)

3.如果生产者非空,意味着需要通过生产者程序将结果发送到Kafka中

if (kafkaProducer != null) { if (result instanceof JSONObject) kafkaProducer.send(((JSONObject) result).toJSONString()); else if (result instanceof JSONArray) kafkaProducer.send(((JSONArray) result).toJSONString()); else if (result instanceof Document) kafkaProducer.send(((Document) result).getTextContent()); else kafkaProducer.send(JSON.toJSONString(result)); 生产者和消费者创建方法 protected KafkaConsumer createConsumer( final KafkaHandlerMeta kafkaHandlerMeta, MessageHandler beanMessageHandler) { KafkaConsumer kafkaConsumer = null; if (kafkaHandlerMeta.getInputConsumer().fixedThreadNum() > 0) { kafkaConsumer = new KafkaConsumer(kafkaHandlerMeta .getInputConsumer().propertiesFile(), kafkaHandlerMeta .getInputConsumer().topic(), kafkaHandlerMeta .getInputConsumer().streamNum(), kafkaHandlerMeta .getInputConsumer().fixedThreadNum(), beanMessageHandler); } else if (kafkaHandlerMeta.getInputConsumer().maxThreadNum() > 0 && kafkaHandlerMeta.getInputConsumer().minThreadNum() < kafkaHandlerMeta .getInputConsumer().maxThreadNum()) { kafkaConsumer = new KafkaConsumer(kafkaHandlerMeta .getInputConsumer().propertiesFile(), kafkaHandlerMeta .getInputConsumer().topic(), kafkaHandlerMeta .getInputConsumer().streamNum(), kafkaHandlerMeta .getInputConsumer().minThreadNum(), kafkaHandlerMeta .getInputConsumer().maxThreadNum(), beanMessageHandler); } else { kafkaConsumer = new KafkaConsumer(kafkaHandlerMeta .getInputConsumer().propertiesFile(), kafkaHandlerMeta .getInputConsumer().topic(), kafkaHandlerMeta .getInputConsumer().streamNum(), beanMessageHandler); } return kafkaConsumer; } protected KafkaProducer createProducer( final KafkaHandlerMeta kafkaHandlerMeta) { KafkaProducer kafkaProducer = null; if (kafkaHandlerMeta.getOutputProducer() != null) { kafkaProducer = new KafkaProducer(kafkaHandlerMeta .getOutputProducer().propertiesFile(), kafkaHandlerMeta .getOutputProducer().defaultTopic()); } // It may return null return kafkaProducer; }

这两部分比较简单,不做赘述。

小结

KClientBoot.java实现了:

获取使用KafkaHandlers中定义注释的方法及其它信息

基于反射机制,生成处理函数。

执行处理函数

创建对应Producer和Consumer

还剩余几个比较简单的部分,比如shutdownAll()等方法,将在具体实现处进行补充介绍。

到此,整个项目的主体功能都已经实现。接下来,将分析上文中出现频率最高的kafkaHandlerMeta与生产者消费者的具体实现。

top.ninwoo.kafka.kclient.boot.KafkaHandlerMeta

KafkaHandlerMeta存储了全部的可用信息,该类实现比较简单,主要分析其成员对象。

Object bean : 存储底层的bean对象

Method method : 存储方法对象

Class<? extends Object> parameterType : 存储参数的类型

InputConsumer inputConsumer : 输入消费者注解对象,其中存储着创建Consumer需要的配置

OutputProducer outputProducer : 输出生产者注解对象,其中存储着创建Producer需要的配置

Map<ErrorHandler, Method> errorHandlers = new HashMap<ErrorHandler, Method>() 异常处理函数与其方法组成的Map

top.ninwoo.kafka.kclient.core.KafkaProducer

该类主要通过多态封装了kafka Producer的接口,提供了更加灵活丰富的api接口,比较简单不做赘述。

top.ninwoo.kafka.kclient.core.KafkaConsumer

该类的核心功能是:

加载配置文件

初始化线程池

初始化GracefullyShutdown函数

初始化kafka连接

在这里跳过构造函数,但在进入核心问题前,先明确几个成员变量的作用。

streamNum : 创建消息流的数量

fixedThreadNum : 异步线程池中的线程数量

minThreadNum : 异步线程池的最小线程数

maxThreadNum : 异步线程池的最大线程数

stream : kafka消息流

streamThreadPool : kafka消息处理线程池

在每个构造函数后都调用了init()方法,所以我们从init()入手。另外一个核心方法startup()将在介绍完init()函数进行介绍。

init()

在执行核心代码前,进行了一系列的验证,这里跳过该部分。

1.加载配置文件

properties = loadPropertiesfile();

2.如果共享异步线程池,则初始化异步线程池

sharedAsyncThreadPool = initAsyncThreadPool();

3.初始化优雅关闭

initGracefullyShutdown();

4.初始化kafka连接

initKafka(); initAsyncThreadPool()

完整代码如下:

private ExecutorService initAsyncThreadPool() { ExecutorService syncThreadPool = null; if (fixedThreadNum > 0) syncThreadPool = Executors.newFixedThreadPool(fixedThreadNum); else syncThreadPool = new ThreadPoolExecutor(minThreadNum, maxThreadNum, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); return syncThreadPool; }

首先,如果异步线程数大于0,则使用该参数进行创建线程池。

syncThreadPool = Executors.newFixedThreadPool(fixedThreadNum);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjyzp.html