简单的来说假如有10条数据,在第5条的时候进行提交了offset下标,那么服务端就知道该组消费的下标到第5条了,如果同组其他的consumer进行消费的时候就会从第6条开始进行消费。但是本地的消费者客户端并不会因此而改变,它还是会继续消费下去,并不会再次从第6条开始消费,所以会出现上图情况。
但是项目中运行之后,是不会因此而重启的,所以这时我们可以换一种思路。
就是如果触发某个条件,所以导致offset未提交,我们就可以关闭之前的consumer,然后新new一个consumer,这样就可以再次进行消费了! 当然配置要和之前的一样。
那么将之前的提交代码更改如下:
if(list.size()==50){ consumer.commitSync(); }else if(list.size()>50){ consumer.close(); init(); list.clear(); list2.clear(); }注:这里因为是测试,为了简单明了,所以条件我写的很简单。实际情况请根据个人的为准。
示例图如下:
说明:
1.因为每次是拉取10条,所以在60条的时候kafka的配置初始化了,然后又从新拉取了50-60条的数据,但是没有提交,所以并不会影响实际结果。
2.这里为了方便截图展示,所以打印条件改了,但是不影响程序!
从测试结果中,我们达到了之前想要测试的目的,未提交的offset可以重复进行消费。
这种做法一般也可以满足大部分需求。
例如从kafka获取数据入库,如果一批数据入库成功,就提交offset,否则不提交,然后再次拉取。
但是这种做法并不能最大的保证数据的完整性。比如在运行的时候,程序挂了之类的。
所以还有一种方法是手动的指定offset下标进行获取数据,直到kafka的数据处理成功之后,将offset记录下来,比如写在数据库中。那么这种做法,等到下一篇再进行尝试吧!
该项目我放在github上了,有兴趣的可以看看!
地址:https://github.com/xuwujing/kafka