Flink CheckPoints 检查点机制

Flink 的 Checkpoint 机制是其可靠性的基石。当一个任务在运行过程中出现故障时,可以根据 Checkpoint 的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。 在 Flink 中,Checkpoint 机制采用的是 chandy-lamport(分布式快照)算法,通过 Checkpoint 机制,保证了 Flink 程序内部的 Exactly Once 语义。

一、Flink CheckPoints

Flink CheckPoints 检查点机制, 使 Flink 的状态具有良好的容错性.

Flink 的 Checkpoint 机制是其可靠性的基石。当一个任务在运行过程中出现故障时,可以根据 Checkpoint 的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。 在 Flink 中,Checkpoint 机制采用的是 chandy-lamport(分布式快照)算法,通过 Checkpoint 机制,保证了 Flink 程序内部的 Exactly Once 语义。

Flink Checkpoint的核心元素就是数据流Barrier

Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后再将该 barrier 传递到下游算子,下游算子接收到该 barrier 后,也基于当前状态生成一份快照,依次传递直至到最后的 Sink 算子上。当出现异常后,Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态。

Barrier将数据流中的数据切分为进入当前Checkpoint的部分和进入下一次Checkpoint的部分,每个Barrier都携带对应Checkpoint的ID。Barrier是非常轻量级的,不会中断数据流的处理。

1、开启检查点

默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironment 的 enableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。

Checkpoint 其他的属性包括:

  • 精确一次(exactly-once)对比至少一次(at-least-once):你可以选择向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入一个模式来选择使用两种保证等级中的哪一种。对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。
  • checkpoint 超时:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。
  • checkpoints 之间的最小时间:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000,无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。
  • 并发 checkpoint 的数目: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。
  • externalized checkpoints: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。更多的细节请看 Externalized checkpoints 的部署文档。
  • 在 checkpoint 出错时使 task 失败或者继续进行 task:他决定了在 task checkpoint 的过程中发生错误时,是否使 task 也失败,使失败是默认的行为。 或者禁用它时,这个任务将会简单的把 checkpoint 错误信息报告给 checkpoint coordinator 并继续运行。
  • 优先从 checkpoint 恢复(prefer checkpoint for recovery):该属性确定 job 是否在最新的 checkpoint 回退,即使有更近的 savepoint 可用,这可以潜在地减少恢复时间(checkpoint 恢复比 savepoint 恢复更快)。
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 每 1000ms 开始一次 checkpoint
    env.enableCheckpointing(1000);
    // 高级选项:
    // 设置模式为精确一次 (这是默认值)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 确认 checkpoints 之间的时间会进行 500 ms
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    // Checkpoint 必须在一分钟内完成,否则就会被抛弃
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    // 同一时间只允许一个 checkpoint 进行
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    // 开启在 job 中止后仍然保留的 externalized checkpoints
    env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    // 允许在有更近 savepoint 时回退到 checkpoint
    env.getCheckpointConfig().setPreferCheckpointForRecovery(true)

2、保存点机制

保存点机制 (Savepoints) 是检查点机制的一种特殊的实现,它允许通过手工的方式来触发 Checkpoint,并将结果持久化存储到指定路径中,主要用于避免 Flink 集群在重启或升级时导致状态丢失。示例如下:

    # 触发指定id的作业的Savepoint,并将结果存储到指定目录下
    bin/flink savepoint :jobId [:targetDirectory]

3 对齐的barrier

Exactly Once语义下,必须进行barrier的对齐,而 At Least Once语义下 barrier 可以不对齐。

barrier对齐只会发生在多对一的Operator(如 join)或者一对多的Operator(如 reparation/shuffle)。如果是一对一的Operator,如map、flatMap 或 filter 等,则没有对齐这个概念,都会实现Exactly Once语义,即使程序中配置了At Least Once 。

也就是说,当operator有多个输入流的时候,它必须要收到所有输入流的barrier之后,才能向下游传递barrier。这就造成operator处于block等待状态,可能会影响整个作业的处理性能。

1. Job Manager中的Checkpoint Coordinator向所有source端发送触发Checkpoint的通知,并在source端注入barrier事件。
2. Source端向下游传递barrier,并将自己的状态异步地写入到持久化存储中。
3. Operator接收到source端传递的barrier之后,会对operator的输入流进行对齐barrier,然后向输出流传递barrier,并将自己的状态异步的写入到持久化存储中。
4. 当sink端接收到所有输入流传递过来的barrier之后,就会向Checkpoint Coordinator通知,此次Checkpoint执行完成。

Flink在整个Checkpoint的执行过程中,不仅会存储此次Checkpoint的状态数据,同时也存储每个算子的状态数据。

从Flink 1.11版本开始,Checkpoint也可以不对齐地执行了。这种情况下,Flink同样会在source端注入barrier,但只在sink端进行对齐,中间的operator在接收到barrier之后立即传递给它的下游operator

二、状态后端

Flink 提供了多种 state backends,它用于指定状态的存储方式和位置。

状态可以位于 Java 的堆或堆外内存。取决于 state backend,Flink 也可以自己管理应用程序的状态。为了让应用程序可以维护非常大的状态,Flink 可以自己管理内存(如果有必要可以溢写到磁盘)。默认情况下,所有 Flink Job 会使用配置文件 flink-conf.yaml 中指定的 state backend。

但是,配置文件中指定的默认 state backend 会被 Job 中指定的 state backend 覆盖。

1、状态管理器分类

MemoryStateBackend

默认的方式,即基于 JVM 的堆内存进行存储,主要适用于本地开发和调试。

FsStateBackend

基于文件系统进行存储,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。 需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。

RocksDBStateBackend

RocksDBStateBackend 是 Flink 内置的第三方状态管理器,采用嵌入式的 key-value 型数据库 RocksDB 来存储正在进行的数据。等到 checkpoint 时,再将其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时也需要配置持久化存储的文件系统。之所以这样做是因为 RocksDB 作为嵌入式数据库安全性比较低,但比起全文件系统的方式,其读取速率更快;比起全内存的方式,其存储空间更大,因此它是一种比较均衡的方案。

2、配置方式

Flink 支持使用两种方式来配置后端管理器:

第一种方式:基于代码方式进行配置,只对当前作业生效:

    // 配置 FsStateBackend
    env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
    // 配置 RocksDBStateBackend
    env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

配置 RocksDBStateBackend 时,需要额外导入下面的依赖:

    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>

第二种方式:基于 flink-conf.yaml 配置文件的方式进行配置,对所有部署在该集群上的作业都生效:

    state.backend: filesystem
    state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

三、状态一致性

1、端到端(end-to-end)

在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:

内部保证:依赖checkpoint

source 端:需要外部源可重设数据的读取位置

sink 端:需要保证从故障恢复时,数据不会重复写入外部系统。

而对于sink端,又有两种具体的实现方式:

幂等(Idempotent)写入:所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。

事务性(Transactional)写入:需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。

对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。

Flink DataStream API 提供了GenericWriteAheadSink 模板类和 TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。

2、Flink+Kafka 实现端到端的 exactly-once语义

端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?

内部:利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性

source:kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性

sink:kafka producer作为sink,采用两阶段提交 sink,需要实现一个TwoPhaseCommitSinkFunction内部的checkpoint机制。

Flink由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。

  1. 当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。
  2. 每个算子会对当前的状态做个快照,保存到状态后端。对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。
  3. 每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。
  4. sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务。
  5. 当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。当sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。

执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。

  • 第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”, jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager
  • sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
  • jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
  • sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
  • 外部kafka关闭事务,提交的数据可以正常消费了。

MapReduce 基础 工作原理

mapreduce 是什么?

MapReduce是Hadoop的核心之一,可以分成Map和Reduce两部分理解。

1.Map:映射过程,把一组数据按照某种Map函数映射成新的数据。我们将这句话拆分提炼出重要信息,也就是说,map主要是:映射、变换、过滤的过程。一条数据进入map会被处理成多条数据,也就是1进N出。

2.Reduce:归纳过程,把若干组映射结果进行汇总并输出。我们同样将重要信息提炼,得到reduce主要是:分解、缩小、归纳的过程。一组数据进入reduce会被归纳为一组数据(或者多组数据),也就是一组进N出。

graph LR
    A[输入数据集] --map--> B[中间结果数据集]
    B--reduce-->C[最终结果数据集]

mapreduce的基本工作流程

分片、格式化数据源

输入 Map 阶段的数据源,必须经过分片和格式化操作。

分片操作:

指的是将源文件划分为大小相等的小数据块( Hadoop 2.x 中默认 128MB ),也就是分片( split ),
Hadoop 会为每一个分片构建一个 Map 任务,并由该任务运行自定义的 map() 函数,从而处理分片里的每一条记录;

格式化操作:

将划分好的分片( split )格式化为键值对形式的数据,其中, key 代表偏移量, value 代表每一行内容。

执行 MapTask

每个 Map 任务都有一个内存缓冲区(缓冲区大小 100MB ),输入的分片( split )数据经过 Map 任务处理后的中间结果会写入内存缓冲区中。
如果写人的数据达到内存缓冲的阈值( 80MB ),会启动一个线程将内存中的溢出数据写入磁盘,同时不影响 Map 中间结果继续写入缓冲区。
在溢写过程中, MapReduce 框架会对 key进行排序,如果中间结果比较大,会形成多个溢写文件,最后的缓冲区数据也会全部溢写入磁盘形成一个溢写文件,如果是多个溢写文件,则最后合并所有的溢写文件为一个文件。

执行 Shuffle 过程

MapReduce 工作过程中, Map 阶段处理的数据如何传递给 Reduce 阶段,这是 MapReduce 框架中关键的一个过程,这个过程叫作 Shuffle 。
Shuffle 会将 MapTask 输出的处理结果数据分发给 ReduceTask ,并在分发的过程中,对数据按 key 进行分区和排序。

执行 ReduceTask

输入 ReduceTask 的数据流是形式,用户可以自定义 reduce()方法进行逻辑处理,最终以的形式输出。

写入文件

MapReduce 框架会自动把 ReduceTask 生成的传入 OutputFormat 的 write 方法,实现文件的写入操作。
graph LR
  subgraph 整体流程
    direction TB
    A1[分片 格式化] --> A2[执行Map Task]
    A2-->A3[执行Shuffle]
    A3-->A4[执行Reduce Task]
    A4-->A5[落盘 写入文件]
  end
  整体流程 --> B[分片 格式化]
  整体流程 --> C[执行Map Task]
  整体流程-->D[执行Shuffle]
  整体流程-->E[执行Reduce Task]
  整体流程-->F[落盘 写入文件]
  B-->B1(分片 128M =>MapTask.map)
  B-->B2(格式化 split=>key,value) -->B3(exg: wordcount key 偏移量  value 每一行的内容)
  C-->C1(1 每个 Map 任务都有一个内存缓冲区 100MB )
  C-->C2(2 map处理后的结果写入内存缓存区)
  C-->C3(3 内存缓存区达到80% key排序溢写到文件)
  C-->C4(4 多个溢写文件合并)
  D-->D1(maptask=>reducetask 分发和排序)
  E-->E1(key, value-list => key value)
  F-->F1(key,value => OutputFormat.write)

MapTask 详解

  1. Read 阶段: MapTask 通过用户编写的 RecordReader ,从输入的 InputSplit 中解析出一个个 key / value 。

  2. Map 阶段:将解析出的 key / value 交给用户编写的 Map ()函数处理,并产生一系列新的 key / value 。

  3. Collect 阶段:在用户编写的 map() 函数中,数据处理完成后,一般会调用 outputCollector.collect() 输出结果,在该函数内部,它会将生成的 key / value 分片(通过调用 partitioner ),并写入一个环形内存缓冲区中(该缓冲区默认大小是 100MB )。

  4. Spill 阶段:即“溢写”,当缓冲区快要溢出时(默认达到缓冲区大小的 80 %),会在本地文件系统创建一个溢出文件,将该缓冲区的数据写入这个文件。

    将数据写入本地磁盘前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
    写入磁盘之前,线程会根据 ReduceTask 的数量,将数据分区,一个 Reduce 任务对应一个分区的数据。
    这样做的目的是为了避免有些 Reduce 任务分配到大量数据,而有些 Reduce 任务分到很少的数据,甚至没有分到数据的尴尬局面。
    如果此时设置了 Combiner ,将排序后的结果进行 Combine 操作,这样做的目的是尽可能少地执行数据写入磁盘的操作。
  5. Combine 阶段:当所有数据处理完成以后, MapTask 会对所有临时文件进行一次合并,以确保最终只会生成一个数据文件

    合并的过程中会不断地进行排序和 Combine 操作,
    其目的有两个:一是尽量减少每次写人磁盘的数据量;二是尽量减少下一复制阶段网络传输的数据量。
    最后合并成了一个已分区且已排序的文件。

ReduceTask 详解

  1. Copy 阶段: Reduce 会从各个 MapTask 上远程复制一片数据(每个 MapTask 传来的数据都是有序的),并针对某一片数据,如果其大小超过一定國值,则写到磁盘上,否则直接放到内存中

  2. Merge 阶段:在远程复制数据的同时, ReduceTask 会启动两个后台线程,分别对内存和磁盘上的文件进行合并,以防止内存使用过多或者磁盘文件过多。

  3. Sort 阶段:用户编写 reduce() 方法输入数据是按 key 进行聚集的一组数据。

    为了将 key 相同的数据聚在一起, Hadoop 采用了基于排序的策略。
    由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此, ReduceTask 只需对所有数据进行一次归并排序即可。
  4. Reduce 阶段:对排序后的键值对调用 reduce() 方法,键相等的键值对调用一次 reduce()方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到 HDFS 中

  5. Write 阶段: reduce() 函数将计算结果写到 HDFS 上。

    合并的过程中会产生许多的中间文件(写入磁盘了),但 MapReduce 会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到 Reduce 函数。

https://blog.csdn.net/weixin_43542605/article/details/122288056

Hadoop HA 基本认知

Hadoop HA(High Available)经过同时配置两个处于Active/Passive模式的Namenode,分别叫Active Namenode和Standby Namenode。 Standby Namenode做为热备份,从而容许在机器发生故障时可以快速进行故障转移,同时在平常维护的时候使用优雅的方式进行Namenode切换。Namenode只能配置一主一备,不能多于两个Namenode。

Hadoop HA基本流程

集群总体上可以分为三部分:NameNode集群、JournalNode集群和Zookeeper集群。NameNode在某一时刻只有一个处于活跃状态,其他的都处于standby状态;JournalNode负责把edits文件传到standby的NameNode上;Zookeeper负责监控NameNode宕机情况,ZKFC(ZookeeperFailoverController)是专门监控NameNode健康的。

为了同步NameNode的元数据一致,有专门的JournalNode来同步元数据文件,活跃的NameNode的edits文件会写入journalnode集群,其他standby的结点会去读取journalnode上的edits文件,以此来同步自身的元数据。

  1. ZKFC的HealthyMonitor是监控NameNode的进程,是专门监控NameNode将康情况的进程。
  2. HealthyMonitor会定时想ZKFC进程报告NameNode情况。
  3. 当HealthyMonitor出现汇报了NameNode,ZKFC就会向AcitveStandbyEloctor报告。
  4. AcitveStandbyEloctor接到NameNode宕机报告就会通知zk集群选举出新的NameNode。
  5. zk集群经过内部选举,返回一个standby的NameNode给AcitveStandbyEloctor。
  6. AcitveStandbyEloctor想ZKFC报告选举结果。
  7. ZKFC为了防止是网络原因导致NameNode假死,就会结束NameNode进程。
  8. zk集群就会通知另一个ZKFC要求它修改它监控的NameNode的进程为活跃节点。

HA技术关键点

HA问题中需要解决的两个问题:

  • 元数据一致性:Standby节点和Active节点的元数据一致性。
  • 主备自动切换:Active节点服务中断时,Standby节点可以立即启动对外提供服务。

为了确保故障转移可以快速完成,Standby Namenode须要维护最新的Block位置信息,即每一个Block副本存放在集群中的哪些节点上。为了达到这一点,Datanode同时配置主备两个Namenode,并同时发送Block报告和心跳到两台Namenode。

确保任什么时候刻只有一个Namenode处于Active状态很是重要,不然可能出现数据丢失或者数据损坏。当两台Namenode都认为本身的Active Namenode时,会同时尝试写入数据(不会再去检测和同步数据)。为了防止这种脑裂现象,Journal Nodes只容许一个Namenode写入数据,内部经过维护epoch数来控制,从而安全地进行故障转移。

HA其本质上就是要保证主备NN元数据是保持一致的,即保证fsimage和editlog在备NN上也是完整的。元数据的同步很大程度取决于EditLog的同步,而这步骤的关键就是共享文件系统

有两种方式能够进行edit log共享:

  1. 使用QJM(Quorum Journal Manager)共享edit log
  2. 使用NFS(Network File System)共享edit log(存储在NAS/SAN)

NFS的方式

all name space edits logged to shared storage;Block reports are sent to both name nodes

显然NFS做为主备Namenode的共享存储。这种方案可能会出现脑裂(split-brain),即两个节点都认为本身是主Namenode并尝试向edit log写入数据,这可能会致使数据损坏。经过配置fencin脚原本解决这个问题,fencing脚本用于:

  • 将以前的Namenode关机
  • 禁止以前的Namenode继续访问共享的edit log文件

使用这种方案,管理员就能够手工触发Namenode切换,而后进行升级维护。但这种方式存在如下问题:

  • 只能手动进行故障转移,每次故障都要求管理员采起措施切换。
  • NAS/SAN设置部署复杂,容易出错,且NAS自己是单点故障。
  • Fencing 很复杂,常常会配置错误。
  • 没法解决意外(unplanned)事故,如硬件或者软件故障。oop
    所以须要另外一种方式来处理这些问题:
  • 自动故障转移(引入ZooKeeper达到自动化)
  • 移除对外界软件硬件的依赖(NAS/SAN)
  • 同时解决意外事故及平常维护致使的不可用

Quorum Journal Manager

QJM(Quorum Journal Manager)是Hadoop专门为Namenode共享存储开发的组件,一般是奇数点结点组成。其集群运行一组Journal Node,每一个Journal 节点暴露一个简单的RPC接口,容许Namenode读取和写入数据,数据存放在Journal节点的本地磁盘。当Namenode写入edit log时,NameNode会同时向所有JournalNode并行写文件,当超过半数节点回复确认成功写入以后,edit log就认为是成功写入。

1、 QJM写过程

NameNode 会把 EditLog 同时写到本地和 JournalNode 中。写本地由配置中的参数dfs.namenode.name.dir来控制,写JN由参数dfs.namenode.shared.edits.dir控制,在写EditLog时会由两个不同的输出流来控制日志的写过程,分别是:

  • EditLogFileOutputStream(本地输出流)
  • QuorumOutputStream(JN输出流)

NameNode在写EditLog时,并不是直接写到磁盘中,为保证高吞吐,NameNode会分别为EditLogFileOutputStream和QuorumOutputStream定义两个同等大小的Buffer,大小大概是512KB,一个写Buffer(buffCurrent),一个同步Buffer(buffReady),这样可以一边写一边同步,所以EditLog是一个异步写过程,同时也是一个批量同步的过程,避免每写一笔就同步一次日志。

这个是怎么实现边写边同步的呢,这中间其实是有一个缓冲区交换的过程,即bufferCurrent和buffReady在达到条件时会触发交换,如bufferCurrent在达到阈值同时bufferReady的数据又同步完时,bufferReady数据会清空,同时会将bufferCurrent指针指向bufferReady以满足继续写,另外会将bufferReady指针指向bufferCurrent以提供继续同步EditLog。

flowchart TD
    A((Client)) --修改--> B(NameNode)
    B-->C(本地 \n EditLogFileOutputStream)
    B-->D(JournalNode \n QuorumOutputStream)
    C-->E(bufferCurrent)
    C-->F(bufferReady)
    D-->H(bufferCurrent)
    D-->I(bufferReady)
    E-->M(本地目录)
    F-->M
    H-->N(JournalNode)
    I-->N

这里有一个问题,既然EditLog是异步写的,怎么保证缓存中的数据不丢呢,其实这里虽然是异步,但实际所有日志都需要通过logSync同步成功后才会给client返回成功码,假设某一时刻NameNode不可用了,其内存中的数据其实是未同步成功的,所以client会认为这部分数据未写成功。

(1)隔离双写

在Active NN每次同步EditLog到JN时,先要保证不会有两个NN同时向JN同步日志。这涉及一个很重要的概念Epoch Numbers,很多分布式系统都会用到。

成为Active结点时,其会被赋予一个EpochNumber,每个EpochNumber是惟一的,不会有相同的EpochNumber出现。EpochNumber有严格顺序保证,每次NN切换后其EpochNumber都会自增1,后面生成的EpochNumber都会大于前面的EpochNumber。QJM是怎么保证上面特性的呢,主要有以下几点:

  1. 在对EditLog作任何修改前,QJM(NameNode上)必须被赋予一个EpochNumber;
  2. QJM把自己的EpochNumber通过newEpoch(N)的方式发送给所有JN结点;
  3. 当JN收到newEpoch请求后,会把QJM的EpochNumber保存到一个lastPromisedEpoch变量中并持久化到本地磁盘;
  4. ANN同步日志到JN的任何RPC请求(如logEdits(),startLogSegment()等),都必须包含ANN的EpochNumber;
  5. JN在收到RPC请求后,会将之与lastPromisedEpoch对比,如果请求的EpochNumber小于lastPromisedEpoch,将会拒绝同步请求,反之,会接受同步请求并将请求的EpochNumber保存在lastPromisedEpoch;

这样就能保证主备NN发生切换时,就算同时向JN同步日志,也能保证日志不会写乱,因为发生切换后,原ANN的EpochNumber肯定是小于新ANN的EpochNumber,所以原ANN向JN的发起的所有同步请求都会拒绝,实现隔离功能,防止了脑裂。

(2)恢复in-process日志

如果在写过程中写失败了,可能各个JN上的EditLog的长度都不一样,需要在开始写之前将不一致的部分恢复。恢复机制如下:

  1. Active NN先向所有JN发送getJournalState请求;
  2. JN会向ANN返回一个Epoch(lastPromisedEpoch);
  3. Active NN收到大多数JN的Epoch后,选择最大的一个并加1作为当前新的Epoch,然后向JN发送新的newEpoch请求,把新的Epoch下发给JN;
  4. JN收到新的Epoch后,和lastPromisedEpoch对比,若更大则更新到本地并返回给Active NN自己本地一个最新EditLogSegment起始事务Id,若小则返回NN错误;
  5. Active NN收到多数JN成功响应后认为Epoch生成成功,开始准备日志恢复;
  6. Active NN会选择一个最大的EditLogSegment事务ID作为恢复依据,然后向JN发送prepareRecovery; RPC请求,对应Paxos协议2p阶段的Phase1a,若多数JN响应prepareRecovery成功,则可认为Phase1a阶段成功;
  7. Active NN选择进行同步的数据源,向JN发送acceptRecovery RPC请求,并将数据源作为参数传给JN。
  8. JN收到acceptRecovery请求后,会从JournalNodeHttpServer下载EditLogSegment并替换到本地保存的EditLogSegment,对应Paxos协议2p阶段的Phase1b,完成后返回Active NN请求成功状态。
  9. Active NN收到多数JN的响应成功请求后,向JN发送finalizeLogSegment请求,表示数据恢复完成,这样之后所有JN上的日志就能保持一致。 数据恢复后,Active NN上会将本地处于in-process状态的日志更名为finalized状态的日志,形式如editsstart-txidstop-txid。

(3)日志同步

  1. 执行logSync过程,将ANN上的日志数据放到缓存队列中
  2. 将缓存中数据同步到JN,JN有相应线程来处理logEdits请求
  3. JN收到数据后,先确认EpochNumber是否合法,再验证日志事务ID是否正常,将日志刷到磁盘,返回ANN成功码
  4. ANN收到JN成功请求后返回client写成功标识,若失败则抛出异常

通过上面一些步骤,日志能保证成功同步到JN,同时保证JN日志的一致性,进而备NN上同步日志时也能保证数据是完整和一致的。

2、QJM读过程

读过程是面向备NN(Standby NN)的,Standby NN定期检查JournalNode上EditLog的变化,然后将EditLog拉回本地。Standby NN上有一个线程StandbyCheckpointer,会定期将Standby NN上FSImage和EditLog合并,并将合并完的FSImage文件传回主NN(Active NN)上,就是所说的Checkpointing过程。下面我们来看下Checkpointing是怎么进行的。

在2.x版本中,已经将原来的由SecondaryNameNode主导的Checkpointing替换成由Standby NN主导的Checkpointing。

  1. 在Standby NN上先检查前置条件,前置条件包括两个方面:距离上次Checkpointing的时间间隔和EditLog中事务条数限制。
  2. 前置条件任何一个满足都会触发Checkpointing,然后SNN会将最新的NameSpace数据即SNN内存中当前状态的元数据保存到一个临时的fsimage文件( fsimage.ckpt)
  3. 然后比对从JN上拉到的最新EditLog的事务ID,将fsimage.ckpt_中没有,EditLog中有的所有元数据修改记录合并一起并重命名成新的fsimage文件,同时生成一个md5文件。
  4. 将最新的fsimage再通过HTTP请求传回ANN。

通过定期合并fsimage有什么好处?

  1. 可以避免EditLog越来越大,合并成新fsimage后可以将老的EditLog删除
  2. 可以避免主NN(ANN)压力过大,合并是在SNN上进行的
  3. 可以保证fsimage保存的是一份最新的元数据,故障恢复时避免数据丢失

三、主备自动切换
Hadoop的主备选举依赖于ZooKeeper。
整个切换过程是由ZKFC来控制的,ZKFC是实现主备切换的组件。每个运行的NameNode上都会有一个ZKFC进程(实际是一个Hadoop进程)。主要的功能如下:

  1. 健康检测:ZKFC会使用健康检测命令定期的ping同节点中的NameNode,只要该NameNode及时的回复健康,则任务当前NameNode是健康的;
  2. Zookeeper会话管理: 当本地NameNode是健康的,ZKFC会保持一个在Zookeeper中打开的会话。如果本地NameNode处于Active状态,ZKFC会保持一个特殊的znode锁,如果回话中断,锁节点讲自动删除;
  3. 基于Zookeeper的选举: 如果本地的NameNode是健康的,且ZKFC发现没有其他的节点持有当前的znode锁,它会为自己获取该锁。如果成功则进行故障切换,并且确保之前的NameNode的进程中断,将本地NameNode切换为Active;

在故障切换期间,ZooKeeper主要是发挥什么作用有以下几点:

  1. 失败保护:集群中每一个NameNode都会在ZooKeeper维护一个持久的session,机器一旦挂掉,session就会过期,故障迁移就会触发;
  2. Active NameNode选择:ZooKeeper有一个选择ActiveNN的机制,一旦现有的ANN宕机,其他NameNode可以向ZooKeeper申请排他成为下一个Active节点;
  3. 防脑裂: ZK本身是强一致和高可用的,可以用它来保证同一时刻只有一个活动节点;

参考链接:
https://blog.csdn.net/weixin_43854618/article/details/108808274

https://blog.csdn.net/shan19920501/article/details/124911283