3.如果生产者非空,意味着需要通过生产者程序将结果发送到Kafka中
if (kafkaProducer != null) { if (result instanceof JSONObject) kafkaProducer.send(((JSONObject) result).toJSONString()); else if (result instanceof JSONArray) kafkaProducer.send(((JSONArray) result).toJSONString()); else if (result instanceof Document) kafkaProducer.send(((Document) result).getTextContent()); else kafkaProducer.send(JSON.toJSONString(result)); 生产者和消费者创建方法 protected KafkaConsumer createConsumer( final KafkaHandlerMeta kafkaHandlerMeta, MessageHandler beanMessageHandler) { KafkaConsumer kafkaConsumer = null; if (kafkaHandlerMeta.getInputConsumer().fixedThreadNum() > 0) { kafkaConsumer = new KafkaConsumer(kafkaHandlerMeta .getInputConsumer().propertiesFile(), kafkaHandlerMeta .getInputConsumer().topic(), kafkaHandlerMeta .getInputConsumer().streamNum(), kafkaHandlerMeta .getInputConsumer().fixedThreadNum(), beanMessageHandler); } else if (kafkaHandlerMeta.getInputConsumer().maxThreadNum() > 0 && kafkaHandlerMeta.getInputConsumer().minThreadNum() < kafkaHandlerMeta .getInputConsumer().maxThreadNum()) { kafkaConsumer = new KafkaConsumer(kafkaHandlerMeta .getInputConsumer().propertiesFile(), kafkaHandlerMeta .getInputConsumer().topic(), kafkaHandlerMeta .getInputConsumer().streamNum(), kafkaHandlerMeta .getInputConsumer().minThreadNum(), kafkaHandlerMeta .getInputConsumer().maxThreadNum(), beanMessageHandler); } else { kafkaConsumer = new KafkaConsumer(kafkaHandlerMeta .getInputConsumer().propertiesFile(), kafkaHandlerMeta .getInputConsumer().topic(), kafkaHandlerMeta .getInputConsumer().streamNum(), beanMessageHandler); } return kafkaConsumer; } protected KafkaProducer createProducer( final KafkaHandlerMeta kafkaHandlerMeta) { KafkaProducer kafkaProducer = null; if (kafkaHandlerMeta.getOutputProducer() != null) { kafkaProducer = new KafkaProducer(kafkaHandlerMeta .getOutputProducer().propertiesFile(), kafkaHandlerMeta .getOutputProducer().defaultTopic()); } // It may return null return kafkaProducer; }这两部分比较简单,不做赘述。
小结KClientBoot.java实现了:
获取使用KafkaHandlers中定义注释的方法及其它信息
基于反射机制,生成处理函数。
执行处理函数
创建对应Producer和Consumer
还剩余几个比较简单的部分,比如shutdownAll()等方法,将在具体实现处进行补充介绍。
到此,整个项目的主体功能都已经实现。接下来,将分析上文中出现频率最高的kafkaHandlerMeta与生产者消费者的具体实现。
top.ninwoo.kafka.kclient.boot.KafkaHandlerMetaKafkaHandlerMeta存储了全部的可用信息,该类实现比较简单,主要分析其成员对象。
Object bean : 存储底层的bean对象
Method method : 存储方法对象
Class<? extends Object> parameterType : 存储参数的类型
InputConsumer inputConsumer : 输入消费者注解对象,其中存储着创建Consumer需要的配置
OutputProducer outputProducer : 输出生产者注解对象,其中存储着创建Producer需要的配置
Map<ErrorHandler, Method> errorHandlers = new HashMap<ErrorHandler, Method>() 异常处理函数与其方法组成的Map
top.ninwoo.kafka.kclient.core.KafkaProducer该类主要通过多态封装了kafka Producer的接口,提供了更加灵活丰富的api接口,比较简单不做赘述。
top.ninwoo.kafka.kclient.core.KafkaConsumer该类的核心功能是:
加载配置文件
初始化线程池
初始化GracefullyShutdown函数
初始化kafka连接
在这里跳过构造函数,但在进入核心问题前,先明确几个成员变量的作用。
streamNum : 创建消息流的数量
fixedThreadNum : 异步线程池中的线程数量
minThreadNum : 异步线程池的最小线程数
maxThreadNum : 异步线程池的最大线程数
stream : kafka消息流
streamThreadPool : kafka消息处理线程池
在每个构造函数后都调用了init()方法,所以我们从init()入手。另外一个核心方法startup()将在介绍完init()函数进行介绍。
init()在执行核心代码前,进行了一系列的验证,这里跳过该部分。
1.加载配置文件
properties = loadPropertiesfile();2.如果共享异步线程池,则初始化异步线程池
sharedAsyncThreadPool = initAsyncThreadPool();3.初始化优雅关闭
initGracefullyShutdown();4.初始化kafka连接
initKafka(); initAsyncThreadPool()完整代码如下:
private ExecutorService initAsyncThreadPool() { ExecutorService syncThreadPool = null; if (fixedThreadNum > 0) syncThreadPool = Executors.newFixedThreadPool(fixedThreadNum); else syncThreadPool = new ThreadPoolExecutor(minThreadNum, maxThreadNum, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); return syncThreadPool; }首先,如果异步线程数大于0,则使用该参数进行创建线程池。
syncThreadPool = Executors.newFixedThreadPool(fixedThreadNum);