KClient——kafka消息中间件源码解读

最近在拜读李艳鹏的《可伸缩服务架构——框架与中间件》,该篇随笔,针对第二章的KClient(kafka消息中间件)源码解读项目,进行学习。

kclient消息中间件

从使用角度上开始入手学习

kclient-processor

该项目使用springboot调用kclient库,程序目录如下:

domain

Cat : 定义了一个cat对象

Dog : 定义了一个Dog对象

handler : 消息处理器

AnimalsHandler : 定义了Cat和Dog的具体行为

KClientApplication.java : Spring boot的主函数——程序执行入口

KClientController.java : Controller 文件

top.ninwoo.kclient.app.KClientApplication

1.启动Spring Boot

ApplicationContext ctxBackend = SpringApplication.run( KClientApplication.class, args);

2.启动程序后将自动加载KClientController(@RestController)

top.ninwoo.kclient.app.KClientController

1.通过@RestController,使@SpringBootApplication,可以自动加载该Controller

2.通过kafka-application.xml加载Beans

private ApplicationContext ctxKafkaProcessor = new ClassPathXmlApplicationContext("kafka-application.xml");

kafka-application.xml声明了一个kclient bean,并设置其初始化执行init方法,具体实现见下章具体实现。

<bean init-method="init"/>

另外声明了一个扫描消息处理器的bean

<context:component-scan base-package="top.ninwoo.kclient.app.handler" />

具体内容在下一节介绍

使用@RequestMapping定义/,/status,/stop,/restart定义了不同的接口

这些接口实现比较简单,需要注意的是他们调用的getKClientBoot()函数。

上文,我们已经通过xml中,添加了两个Bean,调用Bean的具体实现方法如下:

private KClientBoot getKClientBoot() { return (KClientBoot) ctxKafkaProcessor.getBean("kClientBoot"); }

通过Bean获取到KClient获取到了KClientBoot对象,便可以调用其具体方法。

top.ninwoo.kclient.app.handler.AnimalsHandler

消息处理函数

1.使用@KafkaHandlers进行声明bean,关于其具体实现及介绍在具体实现中进行介绍

2.定义了三个处理函数

dogHandler

catHandler

ioExceptionHandler

dogHandler

具体处理很简单,主要分析@InputConsumer和@Consumer的作用,具体实现将在后续进行介绍。

@InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1) @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1") public Cat dogHandler(Dog dog) { System.out.println("Annotated dogHandler handles: " + dog); return new Cat(dog); }

@InputConsumer根据输入参数定义了一个Consumer,通过该Consumer传递具体值给dog,作为该处理函数的
输入。

@OutputProducer根据输入参数定义一个Producer,而该处理函数最后返回的Cat对象,将通过该Producer最终传递到Kafka中

以下的功能与上述相同,唯一需要注意的是 @InputConsumer和@OutputProducer可以单独存在。

@InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1) public void catHandler(Cat cat) throws IOException { System.out.println("Annotated catHandler handles: " + cat); throw new IOException("Man made exception."); } @ErrorHandler(exception = IOException.class, topic = "test1") public void ioExceptionHandler(IOException e, String message) { System.out.println("Annotated excepHandler handles: " + e); } top.ninwoo.kclient.app.domain

只是定义了Cat和Dog对象,不做赘述。

总结

到这里,总结下我们都实现了哪些功能?

程序启动

调用KClientBoot.init()方法

AnimalsHandler定义了消费者和生产者的具体方法。

kclient-core

kclient消息中间件的主体部分,该部分将会涉及

kafka基本操作

反射

项目结构如下:

boot

ErrorHandler

InputConsumer

OutputProducer

KafkaHandlers

KClientBoot

KafkaHandler

KafkaHandlerMeta

core

KafkaConsumer

KafkaProducer

excephandler

DefaultExceptionHandler

ExceptionHandler

handlers

BeanMessageHandler

BeansMessageHandler

DocumentMessageHandler

ObjectMessageHandler

ObjectsMessageHandler

MessageHandler

SafelyMessageHandler

reflection.util

AnnotationHandler

AnnotationTranversor

TranversorContext

在接下来的源码阅读中,我将按照程序执行的顺序进行解读。如果其中涉及到没有讨论过的模块,读者可以向下翻阅。这么
做的唯一原因,为了保证思维的连续性,尽可能不被繁杂的程序打乱。

top.ninwoo.kafka.kclient.boot.KClientBoot

如果读者刚刚阅读上一章节,那么可能记得,我们注册了一个kClientBoot的bean,并设置了初始化函数init(),所以,在kclient源码的阅读中
,我们将从该文件入手,开始解读。

public void init() { meta = getKafkaHandlerMeta(); if (meta.size() == 0) throw new IllegalArgumentException( "No handler method is declared in this spring context."); for (final KafkaHandlerMeta kafkaHandlerMeta : meta) { createKafkaHandler(kafkaHandlerMeta); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjyzp.html