要使用累加器,我们需要创建并注册一个用户定义的函数,然后在客户端上读取结果。下面我们来看看该如何使用呢:
lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() { //创建一个累加器 private IntCounter linesNum = new IntCounter(); @Override public void open(Configuration parameters) throws Exception { //注册一个累加器 getRuntimeContext().addAccumulator("linesNum", linesNum); } @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = line.split("\\W+"); for (String word : words) { out.collect(new Tuple2<>(word, 1)); } // 处理每一行数据后 linesNum 递增 linesNum.add(1); } }) .groupBy(0) .sum(1) .print(); //获取累加器结果 int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum"); System.out.println(linesNum);这样计算就可以统计输入文本中每个单词出现的次数以及它有多少行。
如果需要自定义累加器,还可以使用 Accumulator 或 SimpleAccumulator 接口实现自己的累加器。
最后本篇文章由 zhisheng 翻译,禁止任何无授权的转载。
翻译后地址:
原文地址:https://brewing.codes/2017/10/24/flink-additional-data/
本文部分代码地址:https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-examples/src/main/java/com/zhisheng/examples/batch/accumulator
微信公众号:zhisheng
另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。
更多私密资料请加入知识星球!
博客1、Flink 从0到1学习 —— Apache Flink 介绍
2、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
3、Flink 从0到1学习 —— Flink 配置文件详解
4、Flink 从0到1学习 —— Data Source 介绍
5、Flink 从0到1学习 —— 如何自定义 Data Source ?
6、Flink 从0到1学习 —— Data Sink 介绍
7、Flink 从0到1学习 —— 如何自定义 Data Sink ?
8、Flink 从0到1学习 —— Flink Data transformation(转换)
9、Flink 从0到1学习 —— 介绍 Flink 中的 Stream Windows
10、Flink 从0到1学习 —— Flink 中的几种 Time 详解
11、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 ElasticSearch
12、Flink 从0到1学习 —— Flink 项目如何运行?
13、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Kafka
14、Flink 从0到1学习 —— Flink JobManager 高可用性配置
15、Flink 从0到1学习 —— Flink parallelism 和 Slot 介绍
16、Flink 从0到1学习 —— Flink 读取 Kafka 数据批量写入到 MySQL
17、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RabbitMQ
18、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HBase
19、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HDFS
20、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Redis
21、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Cassandra
22、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Flume
23、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 InfluxDB
24、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RocketMQ
25、Flink 从0到1学习 —— 你上传的 jar 包藏到哪里去了
26、Flink 从0到1学习 —— 你的 Flink job 日志跑到哪里去了
27、阿里巴巴开源的 Blink 实时计算框架真香
28、Flink 从0到1学习 —— Flink 中如何管理配置?
29、Flink 从0到1学习—— Flink 不可以连续 Split(分流)?
30、Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文
31、Flink 架构、原理与部署测试
32、为什么说流处理即未来?
33、OPPO 数据中台之基石:基于 Flink SQL 构建实时数据仓库
34、流计算框架 Flink 与 Storm 的性能对比
35、Flink状态管理和容错机制介绍
36、Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
37、360深度实践:Flink与Storm协议级对比
38、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了
39、Apache Flink 1.9 重大特性提前解读
40、Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)
41、Flink 灵魂两百问,这谁顶得住?
源码解析1、Flink 源码解析 —— 源码编译运行
2、Flink 源码解析 —— 项目结构一览
3、Flink 源码解析—— local 模式启动流程
4、Flink 源码解析 —— standalone session 模式启动流程
5、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Job Manager 启动
6、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动
7、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程
8、Flink 源码解析 —— 分析 Streaming WordCount 程序的执行过程
9、Flink 源码解析 —— 如何获取 JobGraph?
10、Flink 源码解析 —— 如何获取 StreamGraph?
11、Flink 源码解析 —— Flink JobManager 有什么作用?
12、Flink 源码解析 —— Flink TaskManager 有什么作用?
13、Flink 源码解析 —— JobManager 处理 SubmitJob 的过程
14、Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程
15、Flink 源码解析 —— 深度解析 Flink Checkpoint 机制
16、Flink 源码解析 —— 深度解析 Flink 序列化机制
17、Flink 源码解析 —— 深度解析 Flink 是如何管理好内存的?
18、Flink Metrics 源码解析 —— Flink-metrics-core
19、Flink Metrics 源码解析 —— Flink-metrics-datadog
20、Flink Metrics 源码解析 —— Flink-metrics-dropwizard
21、Flink Metrics 源码解析 —— Flink-metrics-graphite
22、Flink Metrics 源码解析 —— Flink-metrics-influxdb
23、Flink Metrics 源码解析 —— Flink-metrics-jmx
24、Flink Metrics 源码解析 —— Flink-metrics-slf4j
25、Flink Metrics 源码解析 —— Flink-metrics-statsd
26、Flink Metrics 源码解析 —— Flink-metrics-prometheus
26、Flink Annotations 源码解析
27、Flink 源码解析 —— 如何获取 ExecutionGraph ?
28、大数据重磅炸弹——实时计算框架 Flink
29、Flink Checkpoint-轻量级分布式快照
30、Flink Clients 源码解析原文出处:zhisheng的博客,欢迎关注我的公众号:zhisheng