最近在拜读李艳鹏的《可伸缩服务架构——框架与中间件》,该篇随笔,针对第二章的KClient(kafka消息中间件)源码解读项目,进行学习。
kclient消息中间件从使用角度上开始入手学习
kclient-processor该项目使用springboot调用kclient库,程序目录如下:
domain
Cat : 定义了一个cat对象
Dog : 定义了一个Dog对象
handler : 消息处理器
AnimalsHandler : 定义了Cat和Dog的具体行为
KClientApplication.java : Spring boot的主函数——程序执行入口
KClientController.java : Controller 文件
top.ninwoo.kclient.app.KClientApplication1.启动Spring Boot
ApplicationContext ctxBackend = SpringApplication.run( KClientApplication.class, args);2.启动程序后将自动加载KClientController(@RestController)
top.ninwoo.kclient.app.KClientController1.通过@RestController,使@SpringBootApplication,可以自动加载该Controller
2.通过kafka-application.xml加载Beans
private ApplicationContext ctxKafkaProcessor = new ClassPathXmlApplicationContext("kafka-application.xml");kafka-application.xml声明了一个kclient bean,并设置其初始化执行init方法,具体实现见下章具体实现。
<bean init-method="init"/>另外声明了一个扫描消息处理器的bean
<context:component-scan base-package="top.ninwoo.kclient.app.handler" />具体内容在下一节介绍
使用@RequestMapping定义/,/status,/stop,/restart定义了不同的接口
这些接口实现比较简单,需要注意的是他们调用的getKClientBoot()函数。
上文,我们已经通过xml中,添加了两个Bean,调用Bean的具体实现方法如下:
private KClientBoot getKClientBoot() { return (KClientBoot) ctxKafkaProcessor.getBean("kClientBoot"); }通过Bean获取到KClient获取到了KClientBoot对象,便可以调用其具体方法。
top.ninwoo.kclient.app.handler.AnimalsHandler消息处理函数
1.使用@KafkaHandlers进行声明bean,关于其具体实现及介绍在具体实现中进行介绍
2.定义了三个处理函数
dogHandler
catHandler
ioExceptionHandler
dogHandler
具体处理很简单,主要分析@InputConsumer和@Consumer的作用,具体实现将在后续进行介绍。
@InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1) @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1") public Cat dogHandler(Dog dog) { System.out.println("Annotated dogHandler handles: " + dog); return new Cat(dog); }
@InputConsumer根据输入参数定义了一个Consumer,通过该Consumer传递具体值给dog,作为该处理函数的
输入。
@OutputProducer根据输入参数定义一个Producer,而该处理函数最后返回的Cat对象,将通过该Producer最终传递到Kafka中
以下的功能与上述相同,唯一需要注意的是 @InputConsumer和@OutputProducer可以单独存在。
@InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1) public void catHandler(Cat cat) throws IOException { System.out.println("Annotated catHandler handles: " + cat); throw new IOException("Man made exception."); } @ErrorHandler(exception = IOException.class, topic = "test1") public void ioExceptionHandler(IOException e, String message) { System.out.println("Annotated excepHandler handles: " + e); } top.ninwoo.kclient.app.domain只是定义了Cat和Dog对象,不做赘述。
总结到这里,总结下我们都实现了哪些功能?
程序启动
调用KClientBoot.init()方法
AnimalsHandler定义了消费者和生产者的具体方法。
kclient-corekclient消息中间件的主体部分,该部分将会涉及
kafka基本操作
反射
项目结构如下:
boot
ErrorHandler
InputConsumer
OutputProducer
KafkaHandlers
KClientBoot
KafkaHandler
KafkaHandlerMeta
core
KafkaConsumer
KafkaProducer
excephandler
DefaultExceptionHandler
ExceptionHandler
handlers
BeanMessageHandler
BeansMessageHandler
DocumentMessageHandler
ObjectMessageHandler
ObjectsMessageHandler
MessageHandler
SafelyMessageHandler
reflection.util
AnnotationHandler
AnnotationTranversor
TranversorContext
在接下来的源码阅读中,我将按照程序执行的顺序进行解读。如果其中涉及到没有讨论过的模块,读者可以向下翻阅。这么
做的唯一原因,为了保证思维的连续性,尽可能不被繁杂的程序打乱。
如果读者刚刚阅读上一章节,那么可能记得,我们注册了一个kClientBoot的bean,并设置了初始化函数init(),所以,在kclient源码的阅读中
,我们将从该文件入手,开始解读。