如何做一个实时的业务统计的监控?比如分钟级?也就是每分钟可以快速看到业务的变化趋势,及可以做一些简单的分组查询?
哎,你可能说很简单了,直接从数据库 count 就可以了! 你是对的。
但如果不允许你使用db进行count呢?因为线上数据库资源可是很宝贵的哦,你这一count可能会给db带来灾难了。
那不然咋整?
没有db,我们还有其他数据源嘛,比如: 消息队列?埋点数据? 本文将是基于该前提而行。
做监控,尽量不要侵入业务太多!所以有一个消息中间件是至关重要的。针对大数据系统,一般是: kafka 或者 类kafka. (如本文基础 loghub)
有了消息中间件,如何进行分钟级监控? 这个应该就很简单了吧。不过如果要自己实现,其实坑也不少的!
如果自己实现计数,那么你可能需要做以下几件事:
1. 每消费一个消息,你需要一个累加器;
2. 每隔一个周期,你可能需要一个归档操作;
3. 你可能需要考虑各种并发安全问题;
4. 你可能需要考虑种性能问题;
5. 你可能需要考虑各种机器故障问题;
6. 你可能需要考虑各种边界值问题;
哎,其实没那么难。时间序列数据库,就专门为这类事情而生!如OpenTSDB:
可以说,TSDB 是这类应用场景的杀手锏。或者基于流计算框架: 如flink, 也是很轻松完成的事。但是不是本文的方向,略过!
本文是基于 loghub 的现有数据,进行分钟级统计后,入库 mysql 中,从而支持随时查询。(因loghub每次查询都是要钱的,所以,不可能直接查询)
loghub 数据结构如: 2019-07-10 10:01:11,billNo,userId,productCode,...
由于loghub提供了很多强大的查询统计功能,所以我们可以直接使用了。
核心功能就是一个统计sql,还是比较简单的。但是需要考虑的点也不少,接下来,将为看官们奉上一个完整的解决方案!
撸代码去!
1. 核心统计任务实现类 MinuteBizDataCounterTaskimport com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.LogContent; import com.aliyun.openservices.log.common.LogItem; import com.aliyun.openservices.log.common.QueriedLog; import com.aliyun.openservices.log.exception.LogException; import com.aliyun.openservices.log.response.GetLogsResponse; import com.my.service.statistics.StatisticsService; import com.my.entity.BizDataStatisticsMin; import com.my.model.LoghubQueryCounterOffsetModel; import com.my.util.loghub.LogHubProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.math.BigDecimal; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; /** * 基于loghub 的分钟级 统计任务 */ @Component @Slf4j public class MinuteBizDataCounterTask implements Runnable { @Resource private LogHubProperties logHubProperties; @Resource private StatisticsService statisticsService; @Resource(name = "defaultOffsetQueryTaskCallback") private DefaultOffsetQueryTaskCallbackImpl defaultOffsetQueryTaskCallback; /** * loghub 客户端 */ private volatile Client mClient; /** * 过滤的topic */ private static final String LOGHUB_TOPIC = "topic_test"; /** * 单次扫描loghub最大时间 间隔分钟数 */ @Value("${loghub.offset.counter.perScanMaxMinutesGap}") private Integer perScanMaxMinutesGap; /** * 单次循环最大数 */ @Value("${loghub.offset.counter.perScanMaxRecordsLimit}") private Integer perScanMaxRecordsLimit; /** * 构造必要实例信息 */ public ProposalPolicyBizDataCounterTask() { } @Override public void run() { if(mClient == null) { this.mClient = new Client(logHubProperties.getEndpoint(), logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey()); } while (!Thread.interrupted()) { try { updateLastMinutePolicyNoCounter(); Thread.sleep(60000); } catch (InterruptedException e) { log.error("【分钟级统计task】, sleep 中断", e); Thread.currentThread().interrupt(); } catch (Exception e) { // 注意此处可能有风险,发生异常后将快速死循环 log.error("【分钟级统计task】更新异常", e); try { Thread.sleep(10000); } catch (InterruptedException ex) { log.error("【分钟级统计task】异常,且sleep异常", ex); Thread.currentThread().interrupt(); } } } } /** * 更新最近的数据 (分钟级) * * @throws LogException loghub查询异常时抛出 */ private void updateLastMinutePolicyNoCounter() throws LogException { updateMinutePolicyNoCounter(null); } /** * 更新最近的数据 */ public Integer updateMinutePolicyNoCounter(LoghubQueryCounterOffsetModel specifyOffset) throws LogException { // 1. 获取偏移量 // 2. 根据偏移量,判定是否可以一次性取完,或者多次获取更新 // 3. 从loghub中设置偏移量,获取统计数据,更新 // 4. 更新db数据统计值 // 5. 更新偏移量 // 6. 等待下一次更新 // 指定offset时,可能为补数据 final LoghubQueryCounterOffsetModel destOffset = enhanceQueryOffset(specifyOffset); initSharedQueryOffset(destOffset, destOffset == specifyOffset); Integer totalAffectNum = 0; while (!isScanFinishOnDestination(destOffset)) { // 完整扫描一次时间周期 calcNextSharedQueryOffset(destOffset); while (true) { calcNextInnerQueryOffset(); ArrayList<QueriedLog> logs = queryPerMinuteStatisticFromLoghubOnCurrentOffset(); Integer affectNum = handleMiniOffsetBatchCounter(logs); totalAffectNum += affectNum; log.info("【分钟级统计task】本次更新数据:{}, offset:{}", affectNum, getCurrentSharedQueryOffset()); if(!hasMoreDataOffset(logs.size())) { rolloverOffsetAndCommit(); break; } } } log.info("【分钟级统计task】本次更新数据,总共:{}, destOffset:{}, curOffset:{}", totalAffectNum, destOffset, getCurrentSharedQueryOffset()); rolloverOffsetAndCommit(); return totalAffectNum; } /** * 处理一小批的统计数据 * * @param logs 小批统计loghub数据 * @return 影响行数 */ private Integer handleMiniOffsetBatchCounter(ArrayList<QueriedLog> logs) { if (logs == null || logs.isEmpty()) { return 0; } List<BizDataStatisticsMin> statisticsMinList = new ArrayList<>(); for (QueriedLog log1 : logs) { LogItem getLogItem = log1.GetLogItem(); BizDataStatisticsMin statisticsMin1 = adaptStatisticsMinDbData(getLogItem); statisticsMin1.setEventCode(PROPOSAL_FOUR_IN_ONE_TOPIC); statisticsMin1.setEtlVersion(getCurrentScanTimeDuring() + ":" + statisticsMin1.getStatisticsCount()); statisticsMinList.add(statisticsMin1); } return statisticsService.batchUpsertPremiumStatistics(statisticsMinList, getCurrentOffsetCallback()); } /** * 获取共享偏移信息 * * @return 偏移 */ private LoghubQueryCounterOffsetModel getCurrentSharedQueryOffset() { return defaultOffsetQueryTaskCallback.getCurrentOffset(); } /** * 判断本次是否扫描完成 * * @param destOffset 目标偏移 * @return true:扫描完成, false: 未完成 */ private boolean isScanFinishOnDestination(LoghubQueryCounterOffsetModel destOffset) { return defaultOffsetQueryTaskCallback.getEndTime() >= destOffset.getEndTime(); } /** * 获取偏移提交回调器 * * @return 回调实例 */ private OffsetQueryTaskCallback getCurrentOffsetCallback() { return defaultOffsetQueryTaskCallback; } /** * 初始化共享的查询偏移变量 * * @param destOffset 目标偏移 * @param isSpecifyOffset 是否是手动指定的偏移 */ private void initSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset, boolean isSpecifyOffset) { // 整分花时间数据 Integer queryStartTime = destOffset.getStartTime(); if(queryStartTime % 60 != 0) { queryStartTime = queryStartTime / 60 * 60; } // 将目标扫描时间终点 设置为起点,以备后续迭代 defaultOffsetQueryTaskCallback.initCurrentOffset(queryStartTime, queryStartTime, destOffset.getOffsetStart(), destOffset.getLimit(), destOffset.getIsNewStep(), isSpecifyOffset); if(defaultOffsetQueryTaskCallback.getIsNewStep()) { resetOffsetDefaultSettings(); } } /** * 计算下一次统计偏移时间 * * @param destOffset 目标偏移值 */ private void calcNextSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset) { int perScanMaxSecondsGap = perScanMaxMinutesGap * 60; if(destOffset.getEndTime() - defaultOffsetQueryTaskCallback.getStartTime() > perScanMaxSecondsGap) { defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime()); int nextExpectEndTime = defaultOffsetQueryTaskCallback.getStartTime() + perScanMaxSecondsGap; if(nextExpectEndTime > destOffset.getEndTime()) { nextExpectEndTime = destOffset.getEndTime(); } defaultOffsetQueryTaskCallback.setEndTime(nextExpectEndTime); } else { defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime()); defaultOffsetQueryTaskCallback.setEndTime(destOffset.getEndTime()); } resetOffsetDefaultSettings(); } /** * 重置偏移默认配置 */ private void resetOffsetDefaultSettings() { defaultOffsetQueryTaskCallback.setIsNewStep(true); defaultOffsetQueryTaskCallback.setOffsetStart(0); defaultOffsetQueryTaskCallback.setLimit(0); } /** * 计算下一次小偏移,此种情况应对 一次外部偏移未查询完成的情况 */ private void calcNextInnerQueryOffset() { defaultOffsetQueryTaskCallback.setIsNewStep(false); // 第一次计算时,limit 为0, 所以得出的 offsetStart 也是0 defaultOffsetQueryTaskCallback.setOffsetStart( defaultOffsetQueryTaskCallback.getOffsetStart() + defaultOffsetQueryTaskCallback.getLimit()); defaultOffsetQueryTaskCallback.setLimit(perScanMaxRecordsLimit); } /** * 获取当前循环的扫描区间 * * @return 15567563433-1635345099 区间 */ private String getCurrentScanTimeDuring() { return defaultOffsetQueryTaskCallback.getStartTime() + "-" + defaultOffsetQueryTaskCallback.getEndTime(); } /** * 从loghub查询每分钟的统计信息 * * @return 查询到的统计信息 * @throws LogException loghub 异常时抛出 */ private ArrayList<QueriedLog> queryPerMinuteStatisticFromLoghubOnCurrentOffset() throws LogException { // 先按保单号去重,再进行计数统计 String countSql = "* | split(bizData, ',')[5] policyNo, bizData GROUP by split(bizData, ',')[5] " + " | select count(1) as totalCountMin, " + "split(bizData, ',')[2] as productCode," + "split(bizData, ',')[3] as schemaCode," + "split(bizData, ',')[4] as channelCode," + "substr(split(bizData, ',')[1], 1, 16) as myDateTimeMinute " + "group by substr(split(bizData, ',')[1], 1, 16), split(bizData, ',')[2],split(bizData, ',')[3], split(bizData, ',')[4],split(bizData, ',')[7], split(bizData, ',')[8]"; countSql += " limit " + defaultOffsetQueryTaskCallback.getOffsetStart() + "," + defaultOffsetQueryTaskCallback.getLimit(); GetLogsResponse countResponse = mClient.GetLogs(logHubProperties.getProjectName(), logHubProperties.getBizCoreDataLogStore(), defaultOffsetQueryTaskCallback.getStartTime(), defaultOffsetQueryTaskCallback.getEndTime(), LOGHUB_TOPIC, countSql); if(!countResponse.IsCompleted()) { log.error("【分钟级统计task】扫描获取到未完整的数据,请速检查原因,offSet:{}", getCurrentSharedQueryOffset()); } return countResponse.GetLogs() == null ? new ArrayList<>() : countResponse.GetLogs(); } /** * 根据上一次返回的记录数量,判断是否还有更多数据 * * @param lastGotRecordsCount 上次返回的记录数 (数据量大于最大数说明还有未取完数据) * @return true: 是还有更多数据应该再循环获取, false: 无更多数据结束本期任务 */ private boolean hasMoreDataOffset(int lastGotRecordsCount) { return lastGotRecordsCount >= perScanMaxRecordsLimit; } /** * 加强版的 offset 优先级: 指定偏移 -> 基于缓存的偏移 -> 新生成偏移标识 * * @param specifyOffset 指定偏移(如有) * @return 偏移标识 */ private LoghubQueryCounterOffsetModel enhanceQueryOffset(LoghubQueryCounterOffsetModel specifyOffset) { if(specifyOffset != null) { return specifyOffset; } LoghubQueryCounterOffsetModel offsetBaseOnCache = getNextOffsetBaseOnCache(); if(offsetBaseOnCache != null) { return offsetBaseOnCache; } return generateNewOffset(); } /** * 基于缓存获取一下偏移标识 * * @return 偏移 */ private LoghubQueryCounterOffsetModel getNextOffsetBaseOnCache() { LoghubQueryCounterOffsetModel offsetFromCache = defaultOffsetQueryTaskCallback.getCurrentOffsetFromCache(); if(offsetFromCache == null) { return null; } LocalDateTime now = LocalDateTime.now(); LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(), now.getHour(), now.getMinute()); // 如果上次仍未内部循环完成,则使用原来的 if(offsetFromCache.getIsNewStep()) { offsetFromCache.setStartTime(offsetFromCache.getEndTime()); long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8")); offsetFromCache.setEndTime((int) endTime); } return offsetFromCache; } /** * 生成新的完整的 偏移标识 * * @return 新偏移 */ private LoghubQueryCounterOffsetModel generateNewOffset() { LoghubQueryCounterOffsetModel offsetNew = new LoghubQueryCounterOffsetModel(); LocalDateTime now = LocalDateTime.now(); LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(), now.getHour(), now.getMinute()); long startTime = nowMinTime.minusDays(1).toEpochSecond(ZoneOffset.of("+8")); long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8")); offsetNew.setStartTime((int) startTime); offsetNew.setEndTime((int) endTime); return offsetNew; } /** * 将日志返回数据 适配到数据库记录中 * * @param logItem 日志详情 * @return db数据结构对应 */ private BizDataStatisticsMin adaptStatisticsMinDbData(LogItem logItem) { ArrayList<LogContent> logContents = logItem.GetLogContents(); BizDataStatisticsMin statisticsMin1 = new BizDataStatisticsMin(); for (LogContent logContent : logContents) { switch (logContent.GetKey()) { case "totalCountMin": statisticsMin1.setStatisticsCount(Integer.valueOf(logContent.GetValue())); break; case "productCode": statisticsMin1.setProductCode(logContent.GetValue()); break; case "myDateTimeMinute": String signDtMinStr = logContent.GetValue(); String[] dateTimeArr = signDtMinStr.split(" "); String countDate = dateTimeArr[0]; String[] timeArr = dateTimeArr[1].split(":"); String countHour = timeArr[0]; String countMin = timeArr[1]; statisticsMin1.setCountDate(countDate); statisticsMin1.setCountHour(countHour); statisticsMin1.setCountMin(countMin); break; default: break; } } return statisticsMin1; } /** * 重置默认值,同时提交当前 (滚动到下一个偏移点) */ private void rolloverOffsetAndCommit() { resetOffsetDefaultSettings(); commitOffsetSync(); } /** * 提交偏移量 * */ private void commitOffsetSync() { defaultOffsetQueryTaskCallback.commit(); } }