一、前言
当发生任务失败时,Flink 需要重启失败的任务和其他受影响的任务,以使作业恢复到正常状态。
重启策略和故障转移策略用于控制任务重启。重启策略决定失败/受影响的任务是否以及何时可以重启。故障转移策略决定应该重启哪些任务来恢复作业。
二、重启策略 Restart Strategies
可以使用默认重启策略启动集群,该策略在没有定义特定于作业的重启策略时始终使用。如果作业是带重启策略提交的,那么该策略将覆盖集群的默认设置。
默认的重启策略通过 Flink 的配置文件 flink-conf.yaml
设置。配置参数 restart-strategy.type
定义使用哪种重启策略。
- 如果未启用检查点,则使用“不重启”策略。
- 如果启用了检查点但未配置重启策略,则使用固定延迟策略,最大重试次数为 Integer.MAX_VALUE。
有哪些可用的重启策略和支持的值可以参考后面的重启策略列表。
每个重启策略都有一组控制其行为的参数。这些参数的值也在配置文件中设置。每个重启策略的描述中都包含了相应配置参数的更多信息。
除了定义默认重启策略外,还可以为每个Flink作业定义特定的重启策略。重启策略可以通过在StreamExecutionEnvironment
上调用setRestartStrategy
方法以编程方式设置。
下面的示例展示了如何为作业设置固定延迟重启策略。如果作业失败,系统将尝试重启3次,每次重启尝试之间间隔10秒。
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
))
Python
env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
3, # number of restart attempts
10000 # delay(millisecond)
))
下面的部分描述了重启策略的具体配置选项。
三、Fixed Delay Restart Strategy 固定延迟重启策略
固定延迟重启策略会尝试指定次数重启作业。如果超过最大尝试次数,作业会最终失败。在连续两次重启尝试之间,重启策略会等待固定时间。
该策略通过在flink-conf.yaml中设置以下配置参数作为默认策略启用:
restart-strategy.type: fixed-delay
示例:
restart-strategy.exponential-delay.initial-backoff: 10 s
restart-strategy.exponential-delay.max-backoff: 2 min
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
restart-strategy.exponential-delay.jitter-factor: 0.1
指数延迟重启策略也可以通过编程方式设置:
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.milliseconds(1),
Time.milliseconds(1000),
1.1, // exponential multiplier
Time.milliseconds(2000), // threshold duration to reset delay to its initial value
0.1 // jitter
));
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts
Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts
1.1, // exponential multiplier
Time.of(2, TimeUnit.SECONDS), // threshold duration to reset delay to its initial value
0.1 // jitter
))
Python
Still not supported in Python API.
四、Failure Rate Restart Strategy 失败率重启策略
失败率重启策略会在失败后重新启动作业,但当超过 failure rate (每个时间间隔的失败次数)时,作业最终会失败。在两次连续的重新启动尝试之间,重新启动策略会等待固定的时间。
通过在 flink-conf.yaml 中设置以下配置参数,默认启用此策略。
restart-strategy.type: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
故障率重启策略也可以通过编程方式设置:
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per interval
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
));
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
))
Python
env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.failure_rate_restart(
3, # max failures per interval
300000, # interval for measuring failure rate (millisecond)
10000 # dela(millisecond)
))
五、No Restart Strategy 无重启策略
作业直接失败并且不尝试重新启动。
restart-strategy.type: none
也可以通过编程方式设置不重启策略:
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
Python
env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.no_restart())
六、Fallback Restart Strategy 回退重启策略
使用集群定义的重启策略。这对于启用检查点的流式传输程序很有帮助。如果没有定义其他重启策略,则默认选择固定延迟重启策略。
七、Failover Strategies 故障转移策略
Flink 支持不同的故障转移策略,可以通过 Flink 配置文件 flink-conf.yaml 中的配置参数jobmanager.execution.failover-strategy
进行配置。
Failover Strategy | Value for jobmanager.execution.failover-strategy |
---|---|
Restart all | full |
Restart pipelined region | region |
八、Restart All Failover Strategy 重新启动所有故障转移策略
此策略会重新启动作业中的所有任务以从任务失败中恢复。
九、Restart Pipelined Region Failover Strategy 重新启动管道区域故障转移策略
该策略将任务分组为不相交的区域。当检测到任务失败时,此策略会计算必须重新启动才能从故障中恢复的最小区域集。对于某些作业,与重新启动所有故障转移策略相比,这可能会导致重新启动的任务更少。
区域是一组通过管道数据交换进行通信的任务。也就是说,批量数据交换表示一个区域的边界。
- DataStream 作业或 Streaming Table/SQL 作业中的所有数据交换都是管道化的。
- 默认情况下,批处理表/SQL 作业中的所有数据交换都是批处理的。
- DataSet作业中的数据交换类型由 ExecutionMode 决定,可以通过ExecutionConfig设置。
重启地区确定如下:
- 包含失败任务的区域将重新启动。
- 如果结果分区不可用,而要重新启动的区域需要该结果分区,则生成结果分区的区域也将重新启动。
- 如果要重新启动某个区域,则其所有消费者区域也将重新启动。这是为了保证数据一致性,因为不确定性处理或分区可能会导致不同的分区。