或许您还比较迷惑如何在这个抽象类中实现我们具体的消费方法,实际上是通过子类实现handleMessage方法进行绑定我们具体的消费方法。
class SequentialMessageTask extends AbstractMessageTask { SequentialMessageTask(KafkaStream<String, String> stream, MessageHandler messageHandler) { super(stream, messageHandler); } @Override protected void handleMessage(String message) { messageHandler.execute(message); } }在该子类中,handleMessage直接执行了messageHandler.execute(message),而没有调用线程池,所以是顺序消费消息。
class ConcurrentMessageTask extends AbstractMessageTask { private ExecutorService asyncThreadPool; ConcurrentMessageTask(KafkaStream<String, String> stream, MessageHandler messageHandler, int threadNum) { super(stream, messageHandler); if (isSharedAsyncThreadPool) asyncThreadPool = sharedAsyncThreadPool; else { asyncThreadPool = initAsyncThreadPool(); } } @Override protected void handleMessage(final String message) { asyncThreadPool.submit(new Runnable() { public void run() { // if it blows, how to recover messageHandler.execute(message); } }); } protected void shutdown() { if (!isSharedAsyncThreadPool) shutdownThreadPool(asyncThreadPool, "async-pool-" + Thread.currentThread().getId()); } }在ConcurrentMessageTask中, handleMessage调用asyncThreadPool.submit()提交了任务到异步线程池中,是一个并发消费。
而messageHandler是通过KClientBoot的createKafkaHandler创建并发送过来的,所以实现了最终的消费。
总结:到此全部的项目解读完毕,如果仍有疑惑,可以参看李艳鹏老师的《可伸缩服务架构框架与中间件》一书,同时也可以与我联系交流问题。