Flink CheckPoints 检查点机制

Flink 的 Checkpoint 机制是其可靠性的基石。当一个任务在运行过程中出现故障时,可以根据 Checkpoint 的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。 在 Flink 中,Checkpoint 机制采用的是 chandy-lamport(分布式快照)算法,通过 Checkpoint 机制,保证了 Flink 程序内部的 Exactly Once 语义。

一、Flink CheckPoints

Flink CheckPoints 检查点机制, 使 Flink 的状态具有良好的容错性.

Flink 的 Checkpoint 机制是其可靠性的基石。当一个任务在运行过程中出现故障时,可以根据 Checkpoint 的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。 在 Flink 中,Checkpoint 机制采用的是 chandy-lamport(分布式快照)算法,通过 Checkpoint 机制,保证了 Flink 程序内部的 Exactly Once 语义。

Flink Checkpoint的核心元素就是数据流Barrier

Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后再将该 barrier 传递到下游算子,下游算子接收到该 barrier 后,也基于当前状态生成一份快照,依次传递直至到最后的 Sink 算子上。当出现异常后,Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态。

Barrier将数据流中的数据切分为进入当前Checkpoint的部分和进入下一次Checkpoint的部分,每个Barrier都携带对应Checkpoint的ID。Barrier是非常轻量级的,不会中断数据流的处理。

1、开启检查点

默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironment 的 enableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。

Checkpoint 其他的属性包括:

  • 精确一次(exactly-once)对比至少一次(at-least-once):你可以选择向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入一个模式来选择使用两种保证等级中的哪一种。对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。
  • checkpoint 超时:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。
  • checkpoints 之间的最小时间:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000,无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。
  • 并发 checkpoint 的数目: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。
  • externalized checkpoints: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。更多的细节请看 Externalized checkpoints 的部署文档。
  • 在 checkpoint 出错时使 task 失败或者继续进行 task:他决定了在 task checkpoint 的过程中发生错误时,是否使 task 也失败,使失败是默认的行为。 或者禁用它时,这个任务将会简单的把 checkpoint 错误信息报告给 checkpoint coordinator 并继续运行。
  • 优先从 checkpoint 恢复(prefer checkpoint for recovery):该属性确定 job 是否在最新的 checkpoint 回退,即使有更近的 savepoint 可用,这可以潜在地减少恢复时间(checkpoint 恢复比 savepoint 恢复更快)。
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 每 1000ms 开始一次 checkpoint
    env.enableCheckpointing(1000);
    // 高级选项:
    // 设置模式为精确一次 (这是默认值)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 确认 checkpoints 之间的时间会进行 500 ms
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    // Checkpoint 必须在一分钟内完成,否则就会被抛弃
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    // 同一时间只允许一个 checkpoint 进行
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    // 开启在 job 中止后仍然保留的 externalized checkpoints
    env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    // 允许在有更近 savepoint 时回退到 checkpoint
    env.getCheckpointConfig().setPreferCheckpointForRecovery(true)

2、保存点机制

保存点机制 (Savepoints) 是检查点机制的一种特殊的实现,它允许通过手工的方式来触发 Checkpoint,并将结果持久化存储到指定路径中,主要用于避免 Flink 集群在重启或升级时导致状态丢失。示例如下:

    # 触发指定id的作业的Savepoint,并将结果存储到指定目录下
    bin/flink savepoint :jobId [:targetDirectory]

3 对齐的barrier

Exactly Once语义下,必须进行barrier的对齐,而 At Least Once语义下 barrier 可以不对齐。

barrier对齐只会发生在多对一的Operator(如 join)或者一对多的Operator(如 reparation/shuffle)。如果是一对一的Operator,如map、flatMap 或 filter 等,则没有对齐这个概念,都会实现Exactly Once语义,即使程序中配置了At Least Once 。

也就是说,当operator有多个输入流的时候,它必须要收到所有输入流的barrier之后,才能向下游传递barrier。这就造成operator处于block等待状态,可能会影响整个作业的处理性能。

1. Job Manager中的Checkpoint Coordinator向所有source端发送触发Checkpoint的通知,并在source端注入barrier事件。
2. Source端向下游传递barrier,并将自己的状态异步地写入到持久化存储中。
3. Operator接收到source端传递的barrier之后,会对operator的输入流进行对齐barrier,然后向输出流传递barrier,并将自己的状态异步的写入到持久化存储中。
4. 当sink端接收到所有输入流传递过来的barrier之后,就会向Checkpoint Coordinator通知,此次Checkpoint执行完成。

Flink在整个Checkpoint的执行过程中,不仅会存储此次Checkpoint的状态数据,同时也存储每个算子的状态数据。

从Flink 1.11版本开始,Checkpoint也可以不对齐地执行了。这种情况下,Flink同样会在source端注入barrier,但只在sink端进行对齐,中间的operator在接收到barrier之后立即传递给它的下游operator

二、状态后端

Flink 提供了多种 state backends,它用于指定状态的存储方式和位置。

状态可以位于 Java 的堆或堆外内存。取决于 state backend,Flink 也可以自己管理应用程序的状态。为了让应用程序可以维护非常大的状态,Flink 可以自己管理内存(如果有必要可以溢写到磁盘)。默认情况下,所有 Flink Job 会使用配置文件 flink-conf.yaml 中指定的 state backend。

但是,配置文件中指定的默认 state backend 会被 Job 中指定的 state backend 覆盖。

1、状态管理器分类

MemoryStateBackend

默认的方式,即基于 JVM 的堆内存进行存储,主要适用于本地开发和调试。

FsStateBackend

基于文件系统进行存储,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。 需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。

RocksDBStateBackend

RocksDBStateBackend 是 Flink 内置的第三方状态管理器,采用嵌入式的 key-value 型数据库 RocksDB 来存储正在进行的数据。等到 checkpoint 时,再将其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时也需要配置持久化存储的文件系统。之所以这样做是因为 RocksDB 作为嵌入式数据库安全性比较低,但比起全文件系统的方式,其读取速率更快;比起全内存的方式,其存储空间更大,因此它是一种比较均衡的方案。

2、配置方式

Flink 支持使用两种方式来配置后端管理器:

第一种方式:基于代码方式进行配置,只对当前作业生效:

    // 配置 FsStateBackend
    env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
    // 配置 RocksDBStateBackend
    env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

配置 RocksDBStateBackend 时,需要额外导入下面的依赖:

    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>

第二种方式:基于 flink-conf.yaml 配置文件的方式进行配置,对所有部署在该集群上的作业都生效:

    state.backend: filesystem
    state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

三、状态一致性

1、端到端(end-to-end)

在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:

内部保证:依赖checkpoint

source 端:需要外部源可重设数据的读取位置

sink 端:需要保证从故障恢复时,数据不会重复写入外部系统。

而对于sink端,又有两种具体的实现方式:

幂等(Idempotent)写入:所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。

事务性(Transactional)写入:需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。

对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。

Flink DataStream API 提供了GenericWriteAheadSink 模板类和 TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。

2、Flink+Kafka 实现端到端的 exactly-once语义

端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?

内部:利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性

source:kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性

sink:kafka producer作为sink,采用两阶段提交 sink,需要实现一个TwoPhaseCommitSinkFunction内部的checkpoint机制。

Flink由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。

  1. 当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。
  2. 每个算子会对当前的状态做个快照,保存到状态后端。对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。
  3. 每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。
  4. sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务。
  5. 当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。当sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。

执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。

  • 第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”, jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager
  • sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
  • jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
  • sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
  • 外部kafka关闭事务,提交的数据可以正常消费了。

Scala 下划线 介绍

scala 下划线 是每个学习这门语言的童鞋必不能错过的知识点。Scala单作为一门语言来看, 非常的简洁高效,在Scala中存在很多让代码更加简洁的语法,下划线“_”便是其中一个。

  1. 作为“通配符”,类似Java中的*。如import scala.math._
  2. :_*([[冒号下划线星]])作为一个整体,告诉编译器你希望将某个参数当作参数序列处理!例如val s = sum(1 to 5:_*)就是将1 to 5当作参数序列处理。
  3. 指代一个集合中的每个元素。a.filter(_%2\==0).map(2*),表示Array a中筛出偶数,并乘以2;如要对缓冲数组ArrayBuffer b排序 val bSorted = b.sorted()
  4. 在元组中,可以用方法_1, _2, _3访问组员。如a._2。其中句点可以用空格替代
    val t = (1, 2, 3) 
    println(t._1, t._2, t._3)
  5. 使用模式匹配可以用来获取元组的组员
    // 比如上一例中val (first, second, _) = t
    val (first, second, third) = t
  6. 划线_代表的是某一类型的默认值
    //String类型的默认值为null 
    var s: String = _
    //Double类型的默认值是 0.0
    var a: Double = _
  7. 匹配集合元素, [[下划线星]]表示变长参数队列
    //匹配以0开头,长度为三的列表 
    expr match { 
    case List(0, _, _) => println("found it") 
    case _ => 
    } 
    //匹配以0开头,长度任意的列表 
    expr match { 
    case List(0, _*) => println("found it") 
    case _ => 
    } 
    //匹配元组元素 
    expr match { 
    case (0, _) => println("found it") 
    case _ => 
    } 
    //将首元素赋值给head变量 
    val List(head, _*) = List("a")
    // 默认匹配
    str match{ 
    case "1" => println("match 1") 
    case _ => println("match default") 
    }
  8. [[function literal]]简写函数字面

    数的参数在函数体内只出现一次,则可以使用下划线代替

    val f1 = (_: Int) + (_: Int) 
    //等价于 
    val f2 = (x: Int, y: Int) => x + y 
    list.foreach(println(_)) 
    //等价于 
    list.foreach(e => println(e)) 
    list.filter(_ > 0) 
    //等价于 
    list.filter(x => x > 0)
  9. 定义一元操作符

    Scala中,操作符其实就是方法,例如1 + 1等价于1.+(1);利用下划线我们可以定义自己的左置操作符

    -2 
    //等价于 
    2.unary_-
  10. 定义赋值操作符

    通过下划线实现赋值操作符

    class Foo { 
    def name = { "foo" } 
    def name_=(str: String) { 
        println("set name " + str) 
    } 
    val m = new Foo()
    m.name = "Foo" 
    //等价于: 
    m.name_=("Foo")
  11. [[partially applied function]] 定义部分应用函数

    可以为某个函数只提供部分参数进行调用,返回的结果是一个新的函数,即部分应用函数

    def sum(a: Int, b: Int, c: Int) = a + b + c 
    val b = sum(1, _: Int, 3) 
    b: Int => Int = <function1> 
    b(2) //6
  12. 将方法转换成函数

    Scala中方法和函数是两个不同的概念,方法无法作为参数进行传递,也无法赋值给变量,但是函数是可以的, 在Scala中,利用下划线可以将方法转换成函数

    //将println方法转换成函数,并赋值给p 
    val p = println _ 
    //p: (Any) => Unit