一、前言
正常情况下,对齐检查点的时间主要由检查点过程中的同步部分和异步部分所决定。然而,当Flink任务在严重的反压下运行时,检查点从头到尾的时间的决定性因素可能是将检查点屏障传播到所有算子/子任务所需的时间。这在检查点过程的概述中有解释,可以通过高的对齐时间和启动延迟指标观察到。当这种情况发生并成为一个问题时,有三种方法可以解决这个问题:
- 通过优化Flink作业、调整Flink或JVM配置或者扩容来减少反压。
- 减少Flink作业中缓冲的在途数据量。
- 开启非对齐检查点。
这些方法不是互斥的,可以组合使用。本文重点关注后两个选项。
二、缓冲区膨胀(Buffer debloating)
Flink 1.14 引入了一个新工具,可以自动控制 Flink 运算符/子任务之间缓冲的在途数据量。可以通过将taskmanager.network.memory.buffer-debloat.enabled
属性设置为true
来启用缓冲区缩减(消胀)(buffer debloating)机制。
该机制与对齐检查点和非对齐检查点都兼容,在两种情况下都可以优化检查点时间。但它对对齐检查点的效果更加明显。当与非对齐检查点一起使用时,缓冲区缩减带来的额外益处是检查点大小更小、恢复时间更短(需要持久化和恢复的在途数据更少)。
有关此功能的工作原理和配置方法的更多信息,请参阅网络内存调优指南。请注意,您也可以按照该调优指南中的描述手动减少缓冲的在途数据量。
三、未对齐的检查点
从Flink 1.11开始,检查点可以是非对齐的。非对齐检查点会将在途数据(即缓存中的数据)作为检查点状态的一部分,这样检查点屏障可以越过这些缓存。因此,检查点持续时间不再依赖于当前吞吐量,因为检查点屏障实际上不再嵌入到数据流中。
如果由于反压导致检查点持续时间很长,则应该使用非对齐检查点。这样,检查点时间就在很大程度上独立于端到端延迟。需要注意的是,非对齐检查点会增加对状态存储的IO,所以如果在检查点期间状态存储的IO才是瓶颈,则不应该使用它。
要启用非对齐检查点,可以:
java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enables the unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// enables the unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()
python
env = StreamExecutionEnvironment.get_execution_environment()
# enables the unaligned checkpoints
env.get_checkpoint_config().enable_unaligned_checkpoints()
或者在 flink-conf.yml
配置文件中:
execution.checkpointing.unaligned: true
四、对齐检查点超时
开启非对齐检查点后,您还可以通过编程方式指定对齐检查点的超时时间:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));
或者在 flink-conf.yml
配置文件中:
execution.checkpointing.aligned-checkpoint-timeout: 30 s
当该机制被激活时,每个检查点仍然会从对齐检查点开始,但是当全局检查点持续时间超过aligned-checkpoint-timeout
设定的值时,如果对齐检查点还未完成,则该检查点会以非对齐检查点的形式继续进行。
也就是说,启用该机制后,检查点的启动过程不变,仍然首先按照对齐检查点的流程开始。只有当对齐检查点超时时,才会动态切换为非对齐模式。
这种机制结合了对齐和非对齐检查点的优点,既可以主要利用非对齐模式加速检查点,又保留对齐模式作为备选,避免检查点完全失败。
五、限制
5.1 并发检查点
Flink 当前不支持同时进行多个非对齐检查点。然而,由于非对齐检查点时间更加可预测和短暂,可能完全不需要进行并发检查点。但是,保存点(savepoints)也无法与非对齐检查点同时进行,所以保存点会稍微花费更长时间。
5.2 与watermarks的相互作用
非对齐检查点会破坏关于水位线在恢复期间的隐含保证。当前,Flink是在恢复的第一步生成水位线,而不是在运算符中存储最新的水位线来方便弹性扩缩容。这意味着在非对齐检查点的恢复中,Flink是在恢复在途数据之后才生成水位线的。如果您的流式计算管道使用了在每个记录上应用最新水位线的运算符,与对齐检查点相比,会产生不同的结果。如果您的运算符依赖于最新水位线始终可用,那么解决方法是在运算符状态中存储水位线。在这种情况下,应该在联合状态中按key group存储水位线以支持弹性扩缩容。
5.3 与长时间运行的记录处理的相互作用
尽管非对齐检查点屏障可以越过队列中的其他所有记录,但如果当前记录的处理时间很长,处理该屏障仍可能被延迟。这种情况可能在一次触发许多定时器时发生,例如在窗口操作中。第二种问题场景可能发生在系统等待多个网络缓冲区的可用性以处理单个输入记录时被阻塞。Flink 不能中断单个输入记录的处理,非对齐检查点必须等待当前处理的记录完全处理完成。这可能在两种情况下造成问题。一种情况是序列化一个大记录导致不适合单个网络缓冲区,另一种情况是在flatMap操作中一个输入记录产生许多输出记录。在这些场景中,反压可能会阻塞非对齐检查点,直到处理单个输入记录所需的所有网络缓冲区可用。任何其他单记录处理时间较长的情况也可能发生此问题。结果,检查点时间可能高于预期或可能有波动。
5.4 某些数据分布模式没有检查点
有些类型的连接的属性无法与检查点中存储的通道数据保持一致。为了保留这些特征并确保没有状态损坏或意外行为,对于此类连接禁用未对齐的检查点。所有其他交易所仍然执行未对齐的检查点。
Pointwise connections 逐点连接
目前,我们对数据有序性的点连接没有任何硬性保证。然而,由于数据的隐式构造方式与任何先前的源或 keyby 相同,因此一些用户依赖此行为将计算密集型任务划分为较小的块,同时依赖于有序性保证。
只要并行性不改变,未对齐检查点 (UC) 就会保留这些属性。随着 UC 的重新调整,情况发生了变化。
考虑这样一个job作业
如果我们要从并行度p=2扩缩容到p=3,keyby通道中的记录需要根据key分组被划分到3个通道中。这可以通过使用运算符的key组范围和确定记录的key(组)的方法轻松实现(与具体方法无关)。但是对于forward通道,我们完全缺乏key上下文。forward通道中的记录都没有分配任何key组;也无法计算key组,因为不能保证key仍然存在。
Broadcast connections 广播连接
广播连接带来了另一个问题。不能保证所有通道以相同速率消费记录。如图所示,这可能导致某些任务应用了与特定广播事件对应的状态更改,而其他任务没有。
广播分区通常用于实现全运算符共享的广播状态。Flink通过只从有状态运算符的子任务0检查点该状态的单个副本来实现广播状态。在恢复时,我们会将该副本发送到所有运算符。因此,可能会发生一个运算符获得已应用某记录更改的状态,而该记录即将从其检查点通道中消费的情况。
六、故障排除(Troubleshooting)
6.1 在途数据损坏(Corrupted in-flight data)
下面描述的操作是最后的手段,因为它们会导致数据丢失。
如果在途数据被损坏或者由于其他原因需要在不恢复在途数据的情况下恢复作业,可以使用 recover-without-channel-state.checkpoint-id
属性。该属性需要指定一个检查点ID,该检查点的在途数据将被忽略。除非持久化的在途数据的损坏导致了其他情况下无法恢复的情况,否则不要设置此属性。该属性只能在作业重新部署后应用,这意味着仅当启用了外部检查点时,此操作才有意义。