spark exactly once 语义随笔

Spark具备很好的机制来保证exactly once的语义,具体体现在数据源的可重放性、计算过程中的容错性、以及写入存储介质时的幂等性或者事务性。exactly once指的是在处理数据的过程中,系统有很好的容错性(fault-tolerance),能够保证数据处理不重不丢,每一条数据仅被处理一次。

语言所蕴含的意义就是语义(semantic)。简单的说,符号是语言的载体。符号本身没有任何意义,只有被赋予含义的符号才能够被使用,这时候语言就转化为了信息,而语言的含义就是语义。
语义可以简单地看作是数据所对应的现实世界中的事物所代表的概念的含义,以及这些含义之间的关系,是数据在某个领域上的解释和逻辑表示。

一、三种语义

spark exactly once 语义是spark很重要的一个概念。分布式系统中都基本上会有三种关键的语义

  • at most once:至多一次,数据可能会丢,但不会重复
  • at least once:至少一次,数据肯定不会丢失,但可能重复
  • exactly once:有且只有一次,数据不丢失不重复,且只出现一次; 该语义是最理想的,但也难以实现

Spark具备很好的机制来保证exactly once的语义,具体体现在数据源的可重放性、计算过程中的容错性、以及写入存储介质时的幂等性或者事务性。exactly once指的是在处理数据的过程中,系统有很好的容错性(fault-tolerance),能够保证数据处理不重不丢,每一条数据仅被处理一次。

一个典型的 Spark Streaming 应用程序会包含三个处理阶段:接收数据、处理汇总、输出结果。每个阶段都需要做不同的处理才能实现相应的语义

二、接收数据 – 数据源的可重放性

取决于上游数据源的特性

  • HDFS 这类支持容错的文件系统中读取文件,能够直接支持 Exactly-once 语义。
  • 上游消息系统支持 ACK(如RabbitMQ),我们就可以结合 Spark 的 Write Ahead Log 特性来实现 At-least-once 语义。
  • 非可靠的数据接收器(如 socketTextStream),当 Worker 或 Driver 节点发生故障时就会产生数据丢失,提供的语义也是未知的。
  • Kafka 消息系统是基于偏移量(Offset)的,它的 Direct API 可以提供 Exactly-once 语义

SparkStreaming接入Kafka的数据有两种模式

一种为Receiver模式,一种为Direct模式。

Receiver模式

Receiver模式都不能够实现exactly once的语义,其根本原因是Kafka自己管理的offset与SparkStreaming实际处理数据的offset没有同步导致的。

Receiver模式采用Kafka的高阶consumer API,Kafka自己封装了对数据的获取逻辑,且通过Zookeeper管理offset信息,这种模式在与SparkStreaming对接时,有以下特点:

  1. Kafka中的partition数量与SparkStreaming中的并行度不是一一对应的,SparkStreaming通过创建Receiver去读取Kafka中数据,createStream()方法传入的并发参数代表的是读取Kafka中topic+partition的线程数,并不能提高SparkStreaming读取数据的并行度。
  2. Kafka自己管理offset,Receiver作为一个高层的Consumer来消费数据,其消费的偏移量(offset)由Kafka记录在Zookeeper中,一旦出现错误,那些已经标记为消费过的数据将会丢失。
  3. Receiver模式下,为了解决读取数据时的并行度问题,可以创建多个DStream,然后union起来
  4. 为了解决数据丢失的问题,可以选择开启Spark的WAL(write ahead log)机制,每次处理数据前将预写日志写入到HDFS中,如果节点出现错误,可以从WAL中恢复。这种方法其实效率低下,不仅数据冗余(Kafka中有副本机制,Spark中还要存一份),且无法保证exactly once,数据可能重复消费。

Direct模式

Spark1.3中引入了Direct模式来替代Receiver模式,它使用Kafka的Simple consumer API,由Spark应用自己管理offset信息,以达成exactly once的语义,其特点如下:

  1. Kafka中的partition与SparkStreaming中的partition一一对应,也就是SparkStreaming读取数据的并行度取决于Kafka中partition的数量。
  2. 不依赖Receiver,而是通过低阶api直接找到topic+partition的leader获取数据,并由SparkStreaming应用自己负责追踪维护消费的offset。
  3. Direct模式下,SparkStreaming应用管理offset的方法案例,其中offset依然是存放在zookeeper中,但是由应用自身来管理的,offset也可以放在Redis、MySQL、HBase中进行管理,根据具体情况进行选择。

转化或汇总 – 计算过程中的容错性

Spark RDD 本身就是一种具备容错性、不变性、以及计算确定性的数据结构。只要数据来源是可用的,且处理过程中没有副作用(Side effect),我们就能一直得到相同的计算结果;撇去Driver与Executor的高可用性不说,Spark应用内部则采用checkpoint和lineage的机制来确保容错性。

lineage

一般翻译为血统,简单来说就是RDD在转化的过程中,由于父RDD与子RDD存在依赖关系(Dependency),从而形成的lineage,也可以理解为lineage串起了RDD DAG。

RDD可以进行缓存,通过调用persist或者cache方法,将RDD持久化到内存或者磁盘中,这样缓存的RDD就可以被保留在计算节点的内存中被重用,缓存是构建Spark快速迭代的关键。

当一个RDD丢失的情况下,Spark会去寻找它的父RDD是否已经缓存,如果已经缓存,就可以通过父RDD直接算出当前的RDD,从而避免了缓存之前的RDD的计算过程,且只有丢失数据的partition需要进行重算,这样Spark就避免了RDD上的重复计算,能够极大的提升计算速度。

缓存虽然可以提升Spark快速迭代计算的速度,但是缓存是会丢失的。

checkpoint

检查点机制就是为了可以切断lineage的依赖关系,在某个重要的节点,将RDD持久化到文件系统中(一般选择HDFS),这样就算之前的缓存已经丢失了,也可以保证检查点数据不会丢失,这样在恢复的时候,会直接从检查点的数据开始进行计算,检查点机制在SparkStreaming这种流式计算中发挥的作用会更大。

可以通过以下源码为入口进一步了解Spark的缓存和检查点机制,RDD在进行计算的时候会调用其iterator方法,在该方法中会首先去读取缓存的数据,如果没有缓存的数据则会去读取checkpoint的数据

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

Spark在计算过程中采用的lineage和checkpoint机制相互结合,取长补短,再加上Spark各个组件底层本身就是具有高可用性,所以在Spark应用在转化计算的过程中,可是保证数据处理的exactly once。

结果输出 – 写入存储介质的幂等性或事务性

幂等更新

  • 多次写入会产生相同的结果数据,我们可以认为这类写入操作是幂等的;saveAsTextFile 就是一种典型的幂等写入
  • 幂等写入只适用于 Map-only 型的计算流程,即没有 Shuffle、Reduce、Repartition 等操作
  • 对 Kafka DStream 做一些额外设置:
    1. 将 enable.auto.commit 设置为 false。默认情况下,Kafka DStream 会在接收到数据后立刻更新自己的偏移量,我们需要将这个动作推迟到计算完成之后。
    2. 打开 Spark Streaming 的 Checkpoint 特性,用于存放 Kafka 偏移量。但若应用程序代码发生变化,Checkpoint 数据也将无法使用,这就需要改用在数据输出之后手动提交 Kafka 偏移量。HasOffsetRanges 类,以及 commitAsync API 可以做到这一点
      messages.foreachRDD { rdd =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition { iter =>
          // output to database
        }
        messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }

事务更新

  • 务型写入时,我们需要生成一个唯一 ID,这个 ID 可以使用当前批次的时间、分区号、或是 Kafka 偏移量来生成
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}
  • 通常会在 foreachPartition 方法中来执行数据库写入操作。对于 Map-only 流程来说是适用的
  • 这种流程下 Kafka 分区和 RDD 分区是一一对应的,我们可以用以下方式获取各分区的偏移量:
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val offsetRange = offsetRanges(TaskContext.get.partitionId)
  }
}
  • 对于包含 Shuffle 的计算流程(如上文的错误日志统计),我们需要先将处理结果拉取到 Driver 进程中,然后才能执行事务操作;如果偏移量写入失败,或者重复处理了某一部分数据(offset != $fromOffset 判断条件不通过),该事务就会回滚,从而做到 Exactly-once。
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val result = processLogs(rdd).collect() // parse log and count error
  DB.localTx { implicit session =>
    result.foreach { case (time, count) =>
      // save to error_log table
    }
    offsetRanges.foreach { offsetRange =>
      val affectedRows = sql"""
      update kafka_offset set offset = ${offsetRange.untilOffset}
      where topic = ${topic} and `partition` = ${offsetRange.partition}
      and offset = ${offsetRange.fromOffset}
      """.update.apply()

      if (affectedRows != 1) {
        throw new Exception("fail to update offset")
      }
    }
  }
}

exactly once固然是个理想的状态,但其实现成本也是非常高的,在对数据可靠性要求不是很高的场景中,at-least-once甚至丢失少量数据也是可以作为一个选项考虑的

总结

官方已经不再推荐使用Receiver模式

Receiver接收方式

  • 多个Receiver接受数据效率高,但有丢失数据的风险
  • 开启日志(WAL)可防止数据丢失,但写两遍数据效率低。
  • Zookeeper维护offset有重复消费数据可能。
  • 使用高层次的API

Direct直连方式

  • 不使用Receiver,直接到kafka分区中读取数据
  • 不使用日志(WAL)机制
  • Spark自己维护offset
  • 使用低层次的API

发表回复