Flink – Checkpoints

内容纲要

一、概述

检查点通过允许恢复状态和相应的流位置,来使 Flink 中的状态具有容错能力,从而为应用程序提供与无故障执行相同的语义。

请参阅 Checkpointing检查点 以了解如何为程序启用和配置检查点。

要了解检查点和savepoints保存点之间的差异,请参阅检查点与保存点 checkpoints vs. savepoints

二、检查点存储

启用检查点后,托管状态将被持久化,以确保发生故障时的一致恢复。在检查点期间保存状态的位置取决于所选的检查点存储(Checkpoint Storage)。

三、可用的检查点存储选项

Flink 开箱即用地捆绑了这些检查点存储类型:

  • JobManagerCheckpointStorage
  • FileSystemCheckpointStorage

如果配置了检查点目录,则将使用 FileSystemCheckpointStorage ,否则系统将使用 JobManagerCheckpointStorage

四、JobManagerCheckpointStorage

JobManagerCheckpointStorage 将检查点快照存储在 JobManager 的堆中。

可以将其配置为:在超过一定大小时使检查点失败,以避免 JobManager 上出现 OutOfMemoryError 。要设置此功能,用户可以实例化一个具有相应最大大小的 JobManagerCheckpointStorage

new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);

JobManagerCheckpointStorage 的限制:

  • 默认情况下,每个状态的大小限制为 5 MB。可以在 JobManagerCheckpointStorage 的构造函数中增加该值。
  • 无论配置的最大状态大小如何,状态都不能大于 Akka 帧大小(查看配置)。
  • 聚合状态必须适合 JobManager 内存。

鼓励使用 JobManagerCheckpointStorage

  • 本地开发和调试
  • 使用很少状态的作业,例如仅包含一次记录函数(Map、FlatMap、Filter 等)的作业。Kafka消费者需要很少的状态。

五、FileSystemCheckpointStorage

FileSystemCheckpointStorage 配置有文件系统 URL(类型、地址、路径),例如“hdfs://namenode:40010/flink/checkpoints”或“file:///data/flink/checkpoints”。

检查点后,它将状态快照写入配置的文件系统和目录中的文件中。最少的元数据存储在 JobManager 的内存中(或者,在高可用性模式下,存储在元数据检查点中)。

如果指定了检查点目录, FileSystemCheckpointStorage 将用于保存检查点快照。

鼓励使用 FileSystemCheckpointStorage

  • 所有高可用性设置。

还建议将 managed-memory 设置为零。这将确保为 JVM 上的用户代码分配最大量的内存。

六、保留的检查点

默认情况下不保留检查点,仅用于从失败中恢复作业。当程序被取消时,它们就会被删除。但是,您可以配置要保留的定期检查点。根据配置,当作业失败或取消时,这些保留的检查点不会自动清除。这样,如果您的工作失败,您将有一个检查点可以恢复。

CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

ExternalizedCheckpointCleanup 模式配置取消作业时检查点会发生的情况:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION :作业取消时保留检查点。请注意,在这种情况下,您必须在取消后手动清理检查点状态。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION :作业取消时删除检查点。仅当作业失败时检查点状态才可用。

七、目录结构

savepoints类似,检查点由元数据文件和一些附加数据文件(具体取决于状态后端)组成。元数据文件和数据文件存储在配置文件中通过 state.checkpoints.dir 配置的目录中,也可以在代码中为每个作业指定。

当前检查点目录布局(由FLINK-8531引入)如下:

/user-defined-checkpoint-dir
    /{job-id}
        |
        + --shared/
        + --taskowned/
        + --chk-1/
        + --chk-2/
        + --chk-3/
        ...

SHARED 目录适用于可能属于多个检查点的状态,TASKOWNED 适用于决不能被 JobManager 删除的状态,而 EXCLUSIVE 则适用于仅属于一个检查点的状态。

检查点目录不是公共 API 的一部分,可以在未来版本中更改。

八、通过配置文件进行全局配置

state.checkpoints.dir: hdfs:///checkpoints/

在检查点配置上为每个作业进行配置

env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");

九、配置检查点存储实例

或者,可以通过指定所需的检查点存储实例来设置检查点存储,该实例允许设置低级配置,例如写入缓冲区大小。

env.getCheckpointConfig().setCheckpointStorage(
  new FileSystemCheckpointStorage("hdfs:///checkpoints-data/", FILE_SIZE_THESHOLD));

十、从保留的检查点恢复

通过使用检查点的元数据文件,可以像从保存点一样从检查点恢复作业(查看savepoints保存点恢复指南)。请注意,如果元数据文件不是独立的,则作业管理器需要有权访问它引用的数据文件(请参阅上面的目录结构)。

$ bin/flink run -s :checkpointMetaDataPath [:runArgs]

Leave a Comment

您的电子邮箱地址不会被公开。 必填项已用*标注

close
arrow_upward