如果你了解 Apache Flink 的话,那么你应该熟悉该如何像 Flink 发送数据或者如何从 Flink 获取数据。但是在某些情况下,我们需要将配置数据发送到 Flink 集群并从中接收一些额外的数据。
在本文的第一部分中,我将描述如何将配置数据发送到 Flink 集群。我们需要配置很多东西:方法参数、配置文件、机器学习模型。Flink 提供了几种不同的方法,我们将介绍如何使用它们以及何时使用它们。在本文的第二部分中,我将描述如何从 Flink 集群中获取数据。
如何发送数据给 TaskManager?在我们深入研究如何在 Apache Flink 中的不同组件之间发送数据之前,让我们先谈谈 Flink 集群中的组件,下图展示了 Flink 中的主要组件以及它们是如何相互作用的:
当我们运行 Flink 应用程序时,它会与 Flink JobManager 进行交互,这个 Flink JobManager 存储了那些正在运行的 Job 的详细信息,例如执行图。
JobManager 它控制着 TaskManager,每个 TaskManager 中包含了一部分数据来执行我们定义的数据处理方法。
在许多的情况下,我们希望能够去配置 Flink Job 中某些运行的函数参数。根据用例,我们可能需要设置单个变量或者提交具有静态配置的文件,我们下面将讨论在 Flink 中该如何实现?
除了向 TaskManager 发送配置数据外,有时我们可能还希望从 Flink Job 的函数方法中返回数据。
如何配置用户自定义函数?假设我们有一个从 CSV 文件中读取电影列表的应用程序(它要过滤特定类型的所有电影):
//读取电影列表数据集合 DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv") .ignoreFirstLine() .parseQuotedStrings('"') .ignoreInvalidLines() .types(Long.class, String.class, String.class); lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> { // 以“|”符号分隔电影类型 String[] genres = movie.f2.split("\\|"); // 查找所有 “动作” 类型的电影 return Stream.of(genres).anyMatch(g -> g.equals("Action")); }).print();我们很可能想要提取不同类型的电影,为此我们需要能够配置我们的过滤功能。 当你要实现这样的函数时,最直接的配置方法是实现构造函数:
// 传递类型名称 lines.filter(new FilterGenre("Action")) .print(); ... class FilterGenre implements FilterFunction<Tuple3<Long, String, String>> { //类型 String genre; //初始化构造方法 public FilterGenre(String genre) { this.genre = genre; } @Override public boolean filter(Tuple3<Long, String, String> movie) throws Exception { String[] genres = movie.f2.split("\\|"); return Stream.of(genres).anyMatch(g -> g.equals(genre)); } }或者,如果你使用 lambda 函数,你可以简单地使用它的闭包中的一个变量:
final String genre = "Action"; lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> { String[] genres = movie.f2.split("\\|"); //使用变量 return Stream.of(genres).anyMatch(g -> g.equals(genre)); }).print();Flink 将序列化此变量并将其与函数一起发送到集群。
如果你需要将大量变量传递给函数,那么这些方法就会变得非常烦人了。 为了解决这个问题,Flink 提供了 withParameters 方法。 要使用它,你需要实现那些 Rich 函数,比如你不必实现 MapFunction 接口,而是实现 RichMapFunction。
Rich 函数允许你使用 withParameters 方法传递许多参数:
// Configuration 类来存储参数 Configuration configuration = new Configuration(); configuration.setString("genre", "Action"); lines.filter(new FilterGenreWithParameters()) // 将参数传递给函数 .withParameters(configuration) .print();要读取这些参数,我们需要实现 "open" 方法并读取其中的参数:
class FilterGenreWithParameters extends RichFilterFunction<Tuple3<Long, String, String>> { String genre; @Override public void open(Configuration parameters) throws Exception { //读取配置 genre = parameters.getString("genre", ""); } @Override public boolean filter(Tuple3<Long, String, String> movie) throws Exception { String[] genres = movie.f2.split("\\|"); return Stream.of(genres).anyMatch(g -> g.equals(genre)); } }所有这些选项都可以使用,但如果需要为多个函数设置相同的参数,则可能会很繁琐。在 Flink 中要处理此种情况, 你可以设置所有 TaskManager 都可以访问的全局环境变量。
为此,首先需要使用 ParameterTool.fromArgs 从命令行读取参数:
public static void main(String... args) { //读取命令行参数 ParameterTool parameterTool = ParameterTool.fromArgs(args); ... }