KClient——kafka消息中间件源码解读 (2)

1.该函数,首先获取了一个HandlerMeta,我们可以简单理解,在这个数据元中,存储了全部的Handler信息,这个Handler信息指的是上一章节中通过@KafkaHandlers定义的处理函数,
具体实现见top.ninwoo.kafka.kclient.boot.KafkaHandlerMeta。

2.获取数据元之后,通过循环,创建对应的处理函数。

for (final KafkaHandlerMeta kafkaHandlerMeta : meta) { createKafkaHandler(kafkaHandlerMeta); }

3.getKafkaHandlerMeta函数的具体实现

a.通过applicationContext获取包含kafkaHandlers注解的Bean名称。

String[] kafkaHandlerBeanNames = applicationContext .getBeanNamesForAnnotation(KafkaHandlers.class);

b.通过BeanName获取到Bean对象

Object kafkaHandlerBean = applicationContext .getBean(kafkaHandlerBeanName); Class<? extends Object> kafkaHandlerBeanClazz = kafkaHandlerBean .getClass();

c.构建mapData数据结构,具体构建见top.ninwoo.kafka.kclient.reflection.util.AnnotationTranversor

Map<Class<? extends Annotation>, Map<Method, Annotation>> mapData = extractAnnotationMaps(kafkaHandlerBeanClazz);

d.map转数据元并添加到数据元meta list中。

meta.addAll(convertAnnotationMaps2Meta(mapData, kafkaHandlerBean));

4.循环遍历创建kafkaHandler

for (final KafkaHandlerMeta kafkaHandlerMeta : meta) { createKafkaHandler(kafkaHandlerMeta); } createKafkaHandler()函数的具体实现:

a.通过meta获取clazz中的参数类型

Class<? extends Object> paramClazz = kafkaHandlerMeta .getParameterType()

b.创建kafkaProducer

KafkaProducer kafkaProducer = createProducer(kafkaHandlerMeta);

c.创建ExceptionHandler

List<ExceptionHandler> excepHandlers = createExceptionHandlers(kafkaHandlerMeta);

d.根据clazz的参数类型,选择消息转换函数

MessageHandler beanMessageHandler = null; if (paramClazz.isAssignableFrom(JSONObject.class)) { beanMessageHandler = createObjectHandler(kafkaHandlerMeta, kafkaProducer, excepHandlers); } else if (paramClazz.isAssignableFrom(JSONArray.class)) { beanMessageHandler = createObjectsHandler(kafkaHandlerMeta, kafkaProducer, excepHandlers); } else if (List.class.isAssignableFrom(Document.class)) { beanMessageHandler = createDocumentHandler(kafkaHandlerMeta, kafkaProducer, excepHandlers); } else if (List.class.isAssignableFrom(paramClazz)) { beanMessageHandler = createBeansHandler(kafkaHandlerMeta, kafkaProducer, excepHandlers); } else { beanMessageHandler = createBeanHandler(kafkaHandlerMeta, kafkaProducer, excepHandlers); }

e.创建kafkaConsumer,并启动

KafkaConsumer kafkaConsumer = createConsumer(kafkaHandlerMeta, beanMessageHandler); kafkaConsumer.startup();

f.创建KafkaHanlder,并添加到列表中

KafkaHandler kafkaHandler = new KafkaHandler(kafkaConsumer, kafkaProducer, excepHandlers, kafkaHandlerMeta); kafkaHandlers.add(kafkaHandler); createExceptionHandlers的具体实现

1.创建一个异常处理列表

List<ExceptionHandler> excepHandlers = new ArrayList<ExceptionHandler>();

2.从kafkaHandlerMeta获取异常处理的注解

for (final Map.Entry<ErrorHandler, Method> errorHandler : kafkaHandlerMeta .getErrorHandlers().entrySet()) {

3.创建一个异常处理对象

ExceptionHandler exceptionHandler = new ExceptionHandler() { public boolean support(Throwable t) {} public void handle(Throwable t, String message) {} support方法判断异常类型是否和输入相同 public boolean support(Throwable t) { // We handle the exception when the classes are exactly same return errorHandler.getKey().exception() == t.getClass(); } handler方法,进一步对异常进行处理

1.获取异常处理方法

Method excepHandlerMethod = errorHandler.getValue();

2.使用Method.invoke执行异常处理方法

excepHandlerMethod.invoke(kafkaHandlerMeta.getBean(), t, message);

这里用到了一些反射原理,以下对invoke做简单介绍

public Object invoke(Object obj, Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException

参数:

obj 从底层方法被调用的对象

args 用于方法的参数

在该项目中的实际情况如下:

Method实际对应top.ninwoo.kclient.app.handler.AnimalsHandler中的:

@ErrorHandler(exception = IOException.class, topic = "test1") public void ioExceptionHandler(IOException e, String message) { System.out.println("Annotated excepHandler handles: " + e); }

参数方面:

kafkaHandlerMeta.getBean() : AninmalsHandler

t

message

invoke完成之后,将会执行ioExceptionHandler函数

4.添加异常处理到列表中

excepHandlers.add(exceptionHandler); createObjectHandler createObjectsHandler createDocumentHandler createBeanHandler createBeansHandler

以上均实现了类似的功能,只是创建了不同类型的对象,然后重写了不同的执行函数。

实现原理和异常处理相同,底层都是调用了invoke函数,通过反射机制启动了对应的函数。

下一节对此做了详细介绍

invokeHandler

1.获取对应Method方法

Method kafkaHandlerMethod = kafkaHandlerMeta.getMethod();

2.执行接收返回结果

Object result = kafkaHandlerMethod.invoke( kafkaHandlerMeta.getBean(), parameter);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjyzp.html