Flink学习笔记(十)Flink容错机制

文章目录

    • 10. Flink容错机制
      • 10.1 检查点(Checkpoint)
        • 10.1.1 检查点的保存
        • 10.1.2 从检查点恢复状态
        • 10.1.3 检查点算法
        • 10.1.4 检查点配置
        • 10.1.5 保存点(Savepoint)
      • 10.2 状态一致性

10. Flink容错机制

在分布式架构中,当某个节点出现故障,其他节点基本不受影响。这时只需要重启应用, 恢复之前某个时间点的状态继续处理就可以了。这一切看似简单,可是在实时流处理中,我们 不仅需要保证故障后能够重启继续运行,还要保证结果的正确性、故障恢复的速度、对处理性 能的影响,这就需要在架构上做出更加精巧的设计。

在 Flink 中,有一套完整的容错机制(fault tolerance)来保证故障后的恢复,其中最重要 的就是检查点(checkpoint)。

10.1 检查点(Checkpoint)

在流处理中,将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点” (checkpoint)。

10.1.1 检查点的保存

  1. 周期性的触发保存

在 Flink 中,检 查点的保存是周期性触发的,间隔时间可以进行设置。当每隔一段时间检查点保存操作被触发时, 就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了 检查点。

  1. 保存的时间点

当所有任务都恰好处理完一个相同的输入数据的时候,将它们的 状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。 其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状 态全部没保存:这就相当于构建了一个“事务”(transaction)。如果出现故障,我们恢复到之 前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(source) 任务向数据源重新提交偏移量、请求重放数据就可以了。

  1. 保存的具体流程

    检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。

当我们需要保存检查点(checkpoint)时,就是在所有任务处理完同一条数据后,对状态做个快照保存下来。至于具体保存到哪里,这是由状态后端的配置 项 “ 检 查 点 存 储 ”( CheckpointStorage )来决定的,可以有作业管理器的堆内存 (JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)两种选择。一般情况下,会将检查点写入持久化的分布式文件系统。

10.1.2 从检查点恢复状态

在运行流处理程序时,Flink 会周期性地保存检查点。当发生故障时,就需要找到最近一 次成功保存的检查点来恢复状态。

(1)重启应用

(2)读取检查点,重置状态

(3)重放数据

(4)继续处理数据

10.1.3 检查点算法

Flink 保存检查点的时间点,是所有任务都处理完同一个输入数据的时候。但是数据经过任务处理之后类型和值都会发生变化,不同的任务怎么知道处理的是“同一个”数据呢?一个简单的想法是,当接到 JobManager 发出的保存检查点的指令后,Source 算子任务处 理完当前数据就暂停等待,不再读取新的数据了。所以更好的做法是,在不暂停整体流处理的前提下,将状态备份保存到检查点。在 Flink 中,采用了基于 Chandy-Lamport 算法的分布式快照。

(1)JobManager 发送指令,触发检查点的保存;Source 任务保存状态,插入分界线

JobManager 会周期性地向每个 TaskManager 发送一条带有新检查点 ID 的消息,通过这 种方式来启动检查点。收到指令后,TaskManger 会在所有 Source 任务中插入一个分界线 (barrier),并将偏移量保存到远程的持久化存储中。

(2)状态快照保存完成,分界线向下游传递

状态存入持久化存储之后,会返回通知给 Source 任务;Source 任务就会向 JobManager 确认检查点完成,然后像数据一样把 barrier 向下游任务传递。

(3)向下游多个并行子任务广播分界线,执行分界线对齐

(4)分界线对齐后,保存状态到持久化存储

各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成 之后,同样将 barrier 向下游继续传递,并通知 JobManager 保存完毕。

(5)先处理缓存数据,然后正常继续处理。

10.1.4 检查点配置

  1. 启用检查点

默认情况下,Flink 程序是禁用检查点的。如果想要为 Flink 应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔 1 秒启动一次检查点保存
env.enableCheckpointing(1000);
  1. 检查点存储(Checkpoint Storage)

检查点具体的持久化存储位置,取决于“检查点存储”(CheckpointStorage)的设置。默 认情况下,检查点存储在 JobManager 的堆(heap)内存中。而对于大状态的持久化保存,Flink 也提供了在其他存储位置进行保存的接口,这就是 CheckpointStorage。

// 配置存储检查点到 JobManager 堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
  1. 其他高级配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点,间隔时间 1 秒
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 设置精确一次模式
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔时间 500 毫秒
checkpointConfig.setMinPauseBetweenCheckpoints(500);
// 超时时间 1 分钟
checkpointConfig.setCheckpointTimeout(60000);
// 同时只能有一个检查点
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 开启检查点的外部持久化保存,作业取消后依然保留
checkpointConfig.enableExternalizedCheckpoints(
 ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 启用不对齐的检查点保存方式
checkpointConfig.enableUnalignedCheckpoints();
// 设置检查点存储,可以直接传入一个 String,指定文件系统的路径
checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")

10.1.5 保存点(Savepoint)

除了检查点(checkpoint)外,Flink 还提供了另一个非常独特的镜像保存功能——保存点 (Savepoint)。它的原理和算法与检查点完全相同,只是多 了一些额外的元数据。事实上,保存点就是通过检查点的机制来创建流式作业状态的一致性镜 像(consistent image)的。

  1. 保存点的用途

保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建, 发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必 须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途 就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用 来做有计划的手动备份和恢复。

(1)版本管理和归档存储

(2)更新 Flink 版本

(3)更新应用程序

(4)调整并行度

(5)暂停应用程序

  1. 使用保存点

(1)创建保存点

bin/flink savepoint :jobId [:targetDirectory]

jobId 需要填充要做镜像保存的作业 ID,目标路径 targetDirectory 可选,表示保存点 存储的路径。

对于保存点的默认路径,可以通过配置文件 flink-conf.yaml 中的 state.savepoints.dir 项来设 定:

state.savepoints.dir: hdfs:///flink/savepoints

(2)从保存点重启应用

bin/flink run -s :savepointPath [:runArgs]

10.2 状态一致性

简单来讲,一致性其实就是结果的正确性。对于分布式系统而言,强调的是不同节点中相 同数据的副本应该总是“一致的”,也就是从不同节点读取时总能得到相同的值。

你可能感兴趣的