然后使用 setGlobalJobParameters 设置全局作业参数:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameterTool); ... //该函数将能够读取这些全局参数 lines.filter(new FilterGenreWithGlobalEnv()) //这个函数是自己定义的 .print();现在我们来看看这个读取这些参数的函数,和上面说的一样,它是一个 Rich 函数:
class FilterGenreWithGlobalEnv extends RichFilterFunction<Tuple3<Long, String, String>> { @Override public boolean filter(Tuple3<Long, String, String> movie) throws Exception { String[] genres = movie.f2.split("\\|"); //获取全局的配置 ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); //读取配置 String genre = parameterTool.get("genre"); return Stream.of(genres).anyMatch(g -> g.equals(genre)); } }要读取配置,我们需要调用 getGlobalJobParameter 来获取所有全局参数,然后使用 get 方法获取我们要的参数。
广播变量如果你想将数据从客户端发送到 TaskManager,上面文章中讨论的方法都适合你,但如果数据以数据集的形式存在于 TaskManager 中,该怎么办? 在这种情况下,最好使用 Flink 中的另一个功能 —— 广播变量。 它只允许将数据集发送给那些执行你 Job 里面函数的任务管理器。
假设我们有一个数据集,其中包含我们在进行文本处理时应忽略的单词,并且我们希望将其设置为我们的函数。 要为单个函数设置广播变量,我们需要使用 withBroadcastSet 方法和数据集。
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3); // 获取要忽略的单词集合 DataSet<String> wordsToIgnore = ... data.map(new RichFlatMapFunction<String, String>() { // 存储要忽略的单词集合. 这将存储在 TaskManager 的内存中 Collection<String> wordsToIgnore; @Override public void open(Configuration parameters) throws Exception { //读取要忽略的单词的集合 wordsToIgnore = getRuntimeContext().getBroadcastVariable("wordsToIgnore"); } @Override public String map(String line, Collector<String> out) throws Exception { String[] words = line.split("\\W+"); for (String word : words) //使用要忽略的单词集合 if (wordsToIgnore.contains(word)) out.collect(new Tuple2<>(word, 1)); } //通过广播变量传递数据集 }).withBroadcastSet(wordsToIgnore, "wordsToIgnore");你应该记住,如果要使用广播变量,那么数据集将会存储在 TaskManager 的内存中,如果数据集和越大,那么占用的内存就会越大,因此使用广播变量适用于较小的数据集。
如果要向每个 TaskManager 发送更多数据并且不希望将这些数据存储在内存中,可以使用 Flink 的分布式缓存向 TaskManager 发送静态文件。 要使用 Flink 的分布式缓存,你首先需要将文件存储在一个分布式文件系统(如 HDFS)中,然后在缓存中注册该文件:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //从 HDFS 注册文件 env.registerCachedFile("hdfs:///path/to/file", "machineLearningModel") ... env.execute()为了访问分布式缓存,我们需要实现一个 Rich 函数:
class MyClassifier extends RichMapFunction<String, Integer> { @Override public void open(Configuration config) { File machineLearningModel = getRuntimeContext().getDistributedCache().getFile("machineLearningModel"); ... } @Override public Integer map(String value) throws Exception { ... } }请注意,要访问分布式缓存中的文件,我们需要使用我们用于注册文件的 key,比如上面代码中的 machineLearningModel。
Accumulator(累加器)我们前面已经介绍了如何将数据发送给 TaskManager,但现在我们将讨论如何从 TaskManager 中返回数据。 你可能想知道为什么我们需要做这种事情。 毕竟,Apache Flink 就是建立数据处理流水线,读取输入数据,处理数据并返回结果。
为了表达清楚,让我们来看一个例子。假设我们需要计算每个单词在文本中出现的次数,同时我们要计算文本中有多少行:
//要处理的数据集合 DataSet<String> lines = ... // Word count 算法 lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = line.split("\\W+"); for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } }) .groupBy(0) .sum(1) .print(); // 计算要处理的文本中的行数 int linesCount = lines.count() System.out.println(linesCount);问题是如果我们运行这个应用程序,它将运行两个 Flink 作业!首先得到单词统计数,然后计算行数。
这绝对是低效的,但我们怎样才能避免这种情况呢?一种方法是使用累加器。它们允许你从 TaskManager 发送数据,并使用预定义的功能聚合此数据。 Flink 有以下内置累加器:
IntCounter,LongCounter,DoubleCounter:允许将 TaskManager 发送的 int,long,double 值汇总在一起
AverageAccumulator:计算双精度值的平均值
LongMaximum,LongMinimum,IntMaximum,IntMinimum,DoubleMaximum,DoubleMinimum:累加器,用于确定不同类型的最大值和最小值
直方图 - 用于计算 TaskManager 的值分布