Flink 专题 -2 Checkpoint、Savepoint 机制 (2)

为了能够在将来升级你的程序在本节中描述。主要的必要更改是通过该uid(String)方法手动指定操作员ID 。这些ID用于确定每个运算符的状态。
java:

DataStream<String> stream = env. // Stateful source (e.g. Kafka) with ID .addSource(new StatefulSource()) .uid("source-id") // ID for the source operator .shuffle() // Stateful mapper with ID .map(new StatefulMapper()) .uid("mapper-id") // ID for the mapper // Stateless printing sink .print(); // Auto-generated ID

如果您未手动指定ID,则会自动生成这些ID。只要这些ID不变,您就可以从保存点自动恢复。生成的ID取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些ID。

Savepoint State

触发保存点时,会创建一个新的保存点目录,其中将存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录来控制此目录的位置

保存Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]
这将触发具有ID的作业的保存点:jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

在yarn 集群中保存Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
这将触发具有ID :jobId和YARN应用程序ID 的作业的保存点:yarnAppId,并返回创建的保存点的路径。

使用 Savepoint 取消job

$ bin/flink cancel -s [:targetDirectory] :jobId
这将以原子方式触发具有ID的作业的保存点:jobid并取消作业。此外,您可以指定目标文件系统目录以存储保存点。该目录需要可由JobManager和TaskManager访问。

Resuming Savepoint

$ bin/flink run -s :savepointPath [:runArgs]
这将提交作业并指定要从中恢复的保存点。您可以指定保存点目录或_metadata文件的路径。

允许未恢复状态启动

$ bin/flink run -s :savepointPath -n [:runArgs]
默认情况下,resume操作将尝试将保存点的所有状态映射回要恢复的程序。如果删除了运算符,则可以通过--allowNonRestoredState(short -n:)选项跳过无法映射到新程序的状态:

全局配置

您可以通过state.savepoints.dir 配置文件设置默认savepoint 位置 。触发保存点时,此目录将用于存储保存点。您可以通过使用触发器命令指定自定义目标目录来覆盖默认值(请参阅:targetDirectory参数)。
flink-conf.yaml

# Default savepoint target directory state.savepoints.dir: hdfs:///flink/savepoints

参考地址:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjgps.html