我们可以把loghub当作一个消息中间件来使用。如果能知道当前的消费进度,自然好了,否则消费情况一无所知,总是有点慌!
loghub消费分两种情况,一是普通消费,二是消费组消费;
消费组消费,loghub服务端会记录消费情况,这时可以通过调用服务端API进行偏移信息查询。
普通消费则不同,需要自行维护偏移量,即只有自己知道偏移信息,自己处理延迟。我们主要讨论这种情况。
一、 消费loghub数据的样例如下:
// 普通消费 private static void consumeDataFromShard(int shardId) throws Exception { String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor(); System.out.println("cursor = " +cursor); try { while (true) { PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor); PullLogsResponse response = client.pullLogs(request); List<LogGroupData> logGroups = response.getLogGroups(); if (logGroups.isEmpty()) { return; } System.out.println(response.getCount()); System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor()); logGroups.forEach(rec1 -> { // do your biz }); cursor = response.getNextCursor(); Thread.sleep(200); } } catch(LogException e) { System.out.println(e.GetRequestId() + e.GetErrorMessage()); } }