Flink – Task Failure Recovery

内容纲要

一、前言

当发生任务失败时,Flink 需要重启失败的任务和其他受影响的任务,以使作业恢复到正常状态。

重启策略和故障转移策略用于控制任务重启。重启策略决定失败/受影响的任务是否以及何时可以重启。故障转移策略决定应该重启哪些任务来恢复作业。

二、重启策略 Restart Strategies

可以使用默认重启策略启动集群,该策略在没有定义特定于作业的重启策略时始终使用。如果作业是带重启策略提交的,那么该策略将覆盖集群的默认设置。

默认的重启策略通过 Flink 的配置文件 flink-conf.yaml 设置。配置参数 restart-strategy.type 定义使用哪种重启策略。

  • 如果未启用检查点,则使用“不重启”策略。
  • 如果启用了检查点但未配置重启策略,则使用固定延迟策略,最大重试次数为 Integer.MAX_VALUE。

有哪些可用的重启策略和支持的值可以参考后面的重启策略列表。

每个重启策略都有一组控制其行为的参数。这些参数的值也在配置文件中设置。每个重启策略的描述中都包含了相应配置参数的更多信息。

除了定义默认重启策略外,还可以为每个Flink作业定义特定的重启策略。重启策略可以通过在StreamExecutionEnvironment上调用setRestartStrategy方法以编程方式设置。

下面的示例展示了如何为作业设置固定延迟重启策略。如果作业失败,系统将尝试重启3次,每次重启尝试之间间隔10秒。

Java

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
));

Scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
))

Python

env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
    3,  # number of restart attempts
    10000  # delay(millisecond)
))

下面的部分描述了重启策略的具体配置选项。

三、Fixed Delay Restart Strategy 固定延迟重启策略

固定延迟重启策略会尝试指定次数重启作业。如果超过最大尝试次数,作业会最终失败。在连续两次重启尝试之间,重启策略会等待固定时间。

该策略通过在flink-conf.yaml中设置以下配置参数作为默认策略启用:

restart-strategy.type: fixed-delay
Key Default Type Description
restart-strategy.exponential-delay.backoff-multiplier 2.0 Double 如果restart-strategy.type设置为exponential-delay,则每次失败后,延迟值将乘以该配置值进行退避,直到达到最大退避延迟。
restart-strategy.exponential-delay.initial-backoff 1 s Duration 如果restart-strategy.type设置为exponential-delay,则该参数指定重启间的起始延迟。可以使用"1 min"、"20 s"等格式指定。
restart-strategy.exponential-delay.jitter-factor 0.1 Double 如果restart-strategy.type设置为exponential-delay,则该参数指定了退避时间的抖动比例。它表示会向退避时间添加或减去一个什么大小的随机值。当希望避免多个作业同时重启时很有用。
restart-strategy.exponential-delay.max-backoff 5 min Duration 如果restart-strategy.type设置为exponential-delay,则这个参数指定重启间的最长可能延迟。可以使用"1 min"、"20 s"等格式指定。
restart-strategy.exponential-delay.reset-backoff-threshold 1 h Duration 如果restart-strategy.type设置为exponential-delay,则这个参数指定重置退避值到初始值的阈值。它指定了作业在没有失败的情况下运行多长时间后,将按指数增加的退避重置为初始值。可以使用"1 min"、"20 s"等格式指定。

示例:

restart-strategy.exponential-delay.initial-backoff: 10 s
restart-strategy.exponential-delay.max-backoff: 2 min
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
restart-strategy.exponential-delay.jitter-factor: 0.1

指数延迟重启策略也可以通过编程方式设置:

Java

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
  Time.milliseconds(1),
  Time.milliseconds(1000),
  1.1, // exponential multiplier
  Time.milliseconds(2000), // threshold duration to reset delay to its initial value
  0.1 // jitter
));

Scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
  Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts
  Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts
  1.1, // exponential multiplier
  Time.of(2, TimeUnit.SECONDS), // threshold duration to reset delay to its initial value
  0.1 // jitter
))

Python

Still not supported in Python API.

四、Failure Rate Restart Strategy 失败率重启策略

失败率重启策略会在失败后重新启动作业,但当超过 failure rate (每个时间间隔的失败次数)时,作业最终会失败。在两次连续的重新启动尝试之间,重新启动策略会等待固定的时间。

通过在 flink-conf.yaml 中设置以下配置参数,默认启用此策略。

restart-strategy.type: failure-rate
Key Default Type Description
restart-strategy.failure-rate.delay 1 s Duration 如果restart-strategy.type设置为failure-rate,则表示两次连续重启尝试之间的延迟。可以使用“1 min”、“20 s”等格式指定。
restart-strategy.failure-rate.failure-rate-interval 1 min Duration 如果restart-strategy.type设置为failure-rate,则这个参数指定测量失败率的时间间隔。可以使用“1 min”、“20 s”等格式指定。
restart-strategy.failure-rate.max-failures-per-interval 1 Integer 如果restart-strategy.type设置为failure-rate,则这个参数指定在给定时间间隔内作业失败前的最大重启次数。
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

故障率重启策略也可以通过编程方式设置:

Java

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // max failures per interval
  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  Time.of(10, TimeUnit.SECONDS) // delay
));

Scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // max failures per unit
  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  Time.of(10, TimeUnit.SECONDS) // delay
))

Python

env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.failure_rate_restart(
    3,  # max failures per interval
    300000,  # interval for measuring failure rate (millisecond)
    10000  # dela(millisecond)
))

五、No Restart Strategy 无重启策略

作业直接失败并且不尝试重新启动。

restart-strategy.type: none

也可以通过编程方式设置不重启策略:

Java

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());

Scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

Python

env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.no_restart())

六、Fallback Restart Strategy 回退重启策略

使用集群定义的重启策略。这对于启用检查点的流式传输程序很有帮助。如果没有定义其他重启策略,则默认选择固定延迟重启策略。

七、Failover Strategies 故障转移策略

Flink 支持不同的故障转移策略,可以通过 Flink 配置文件 flink-conf.yaml 中的配置参数jobmanager.execution.failover-strategy 进行配置。

Failover Strategy Value for jobmanager.execution.failover-strategy
Restart all full
Restart pipelined region region

八、Restart All Failover Strategy 重新启动所有故障转移策略

此策略会重新启动作业中的所有任务以从任务失败中恢复。

九、Restart Pipelined Region Failover Strategy 重新启动管道区域故障转移策略

该策略将任务分组为不相交的区域。当检测到任务失败时,此策略会计算必须重新启动才能从故障中恢复的最小区域集。对于某些作业,与重新启动所有故障转移策略相比,这可能会导致重新启动的任务更少。

区域是一组通过管道数据交换进行通信的任务。也就是说,批量数据交换表示一个区域的边界。

  • DataStream 作业或 Streaming Table/SQL 作业中的所有数据交换都是管道化的。
  • 默认情况下,批处理表/SQL 作业中的所有数据交换都是批处理的。
  • DataSet作业中的数据交换类型由 ExecutionMode 决定,可以通过ExecutionConfig设置。

重启地区确定如下:

  1. 包含失败任务的区域将重新启动。
  2. 如果结果分区不可用,而要重新启动的区域需要该结果分区,则生成结果分区的区域也将重新启动。
  3. 如果要重新启动某个区域,则其所有消费者区域也将重新启动。这是为了保证数据一致性,因为不确定性处理或分区可能会导致不同的分区。

Leave a Comment

您的电子邮箱地址不会被公开。 必填项已用*标注

close
arrow_upward