KClient——kafka消息中间件源码解读 (5)

或许您还比较迷惑如何在这个抽象类中实现我们具体的消费方法,实际上是通过子类实现handleMessage方法进行绑定我们具体的消费方法。

class SequentialMessageTask extends AbstractMessageTask { SequentialMessageTask(KafkaStream<String, String> stream, MessageHandler messageHandler) { super(stream, messageHandler); } @Override protected void handleMessage(String message) { messageHandler.execute(message); } }

在该子类中,handleMessage直接执行了messageHandler.execute(message),而没有调用线程池,所以是顺序消费消息。

class ConcurrentMessageTask extends AbstractMessageTask { private ExecutorService asyncThreadPool; ConcurrentMessageTask(KafkaStream<String, String> stream, MessageHandler messageHandler, int threadNum) { super(stream, messageHandler); if (isSharedAsyncThreadPool) asyncThreadPool = sharedAsyncThreadPool; else { asyncThreadPool = initAsyncThreadPool(); } } @Override protected void handleMessage(final String message) { asyncThreadPool.submit(new Runnable() { public void run() { // if it blows, how to recover messageHandler.execute(message); } }); } protected void shutdown() { if (!isSharedAsyncThreadPool) shutdownThreadPool(asyncThreadPool, "async-pool-" + Thread.currentThread().getId()); } }

在ConcurrentMessageTask中, handleMessage调用asyncThreadPool.submit()提交了任务到异步线程池中,是一个并发消费。

而messageHandler是通过KClientBoot的createKafkaHandler创建并发送过来的,所以实现了最终的消费。

总结:

到此全部的项目解读完毕,如果仍有疑惑,可以参看李艳鹏老师的《可伸缩服务架构框架与中间件》一书,同时也可以与我联系交流问题。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjyzp.html