1.该函数,首先获取了一个HandlerMeta,我们可以简单理解,在这个数据元中,存储了全部的Handler信息,这个Handler信息指的是上一章节中通过@KafkaHandlers定义的处理函数,
具体实现见top.ninwoo.kafka.kclient.boot.KafkaHandlerMeta。
2.获取数据元之后,通过循环,创建对应的处理函数。
for (final KafkaHandlerMeta kafkaHandlerMeta : meta) { createKafkaHandler(kafkaHandlerMeta); }3.getKafkaHandlerMeta函数的具体实现
a.通过applicationContext获取包含kafkaHandlers注解的Bean名称。
String[] kafkaHandlerBeanNames = applicationContext .getBeanNamesForAnnotation(KafkaHandlers.class);b.通过BeanName获取到Bean对象
Object kafkaHandlerBean = applicationContext .getBean(kafkaHandlerBeanName); Class<? extends Object> kafkaHandlerBeanClazz = kafkaHandlerBean .getClass();c.构建mapData数据结构,具体构建见top.ninwoo.kafka.kclient.reflection.util.AnnotationTranversor
Map<Class<? extends Annotation>, Map<Method, Annotation>> mapData = extractAnnotationMaps(kafkaHandlerBeanClazz);d.map转数据元并添加到数据元meta list中。
meta.addAll(convertAnnotationMaps2Meta(mapData, kafkaHandlerBean));4.循环遍历创建kafkaHandler
for (final KafkaHandlerMeta kafkaHandlerMeta : meta) { createKafkaHandler(kafkaHandlerMeta); } createKafkaHandler()函数的具体实现:a.通过meta获取clazz中的参数类型
Class<? extends Object> paramClazz = kafkaHandlerMeta .getParameterType()b.创建kafkaProducer
KafkaProducer kafkaProducer = createProducer(kafkaHandlerMeta);c.创建ExceptionHandler
List<ExceptionHandler> excepHandlers = createExceptionHandlers(kafkaHandlerMeta);d.根据clazz的参数类型,选择消息转换函数
MessageHandler beanMessageHandler = null; if (paramClazz.isAssignableFrom(JSONObject.class)) { beanMessageHandler = createObjectHandler(kafkaHandlerMeta, kafkaProducer, excepHandlers); } else if (paramClazz.isAssignableFrom(JSONArray.class)) { beanMessageHandler = createObjectsHandler(kafkaHandlerMeta, kafkaProducer, excepHandlers); } else if (List.class.isAssignableFrom(Document.class)) { beanMessageHandler = createDocumentHandler(kafkaHandlerMeta, kafkaProducer, excepHandlers); } else if (List.class.isAssignableFrom(paramClazz)) { beanMessageHandler = createBeansHandler(kafkaHandlerMeta, kafkaProducer, excepHandlers); } else { beanMessageHandler = createBeanHandler(kafkaHandlerMeta, kafkaProducer, excepHandlers); }e.创建kafkaConsumer,并启动
KafkaConsumer kafkaConsumer = createConsumer(kafkaHandlerMeta, beanMessageHandler); kafkaConsumer.startup();f.创建KafkaHanlder,并添加到列表中
KafkaHandler kafkaHandler = new KafkaHandler(kafkaConsumer, kafkaProducer, excepHandlers, kafkaHandlerMeta); kafkaHandlers.add(kafkaHandler); createExceptionHandlers的具体实现1.创建一个异常处理列表
List<ExceptionHandler> excepHandlers = new ArrayList<ExceptionHandler>();2.从kafkaHandlerMeta获取异常处理的注解
for (final Map.Entry<ErrorHandler, Method> errorHandler : kafkaHandlerMeta .getErrorHandlers().entrySet()) {3.创建一个异常处理对象
ExceptionHandler exceptionHandler = new ExceptionHandler() { public boolean support(Throwable t) {} public void handle(Throwable t, String message) {} support方法判断异常类型是否和输入相同 public boolean support(Throwable t) { // We handle the exception when the classes are exactly same return errorHandler.getKey().exception() == t.getClass(); } handler方法,进一步对异常进行处理1.获取异常处理方法
Method excepHandlerMethod = errorHandler.getValue();2.使用Method.invoke执行异常处理方法
excepHandlerMethod.invoke(kafkaHandlerMeta.getBean(), t, message);这里用到了一些反射原理,以下对invoke做简单介绍
public Object invoke(Object obj, Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException参数:
obj 从底层方法被调用的对象
args 用于方法的参数
在该项目中的实际情况如下:
Method实际对应top.ninwoo.kclient.app.handler.AnimalsHandler中的:
@ErrorHandler(exception = IOException.class, topic = "test1") public void ioExceptionHandler(IOException e, String message) { System.out.println("Annotated excepHandler handles: " + e); }参数方面:
kafkaHandlerMeta.getBean() : AninmalsHandler
t
message
invoke完成之后,将会执行ioExceptionHandler函数
4.添加异常处理到列表中
excepHandlers.add(exceptionHandler); createObjectHandler createObjectsHandler createDocumentHandler createBeanHandler createBeansHandler以上均实现了类似的功能,只是创建了不同类型的对象,然后重写了不同的执行函数。
实现原理和异常处理相同,底层都是调用了invoke函数,通过反射机制启动了对应的函数。
下一节对此做了详细介绍
invokeHandler1.获取对应Method方法
Method kafkaHandlerMethod = kafkaHandlerMeta.getMethod();2.执行接收返回结果
Object result = kafkaHandlerMethod.invoke( kafkaHandlerMeta.getBean(), parameter);