spark serialization Kryo序列化

Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多

spark 序列化

Spark中,主要有三个涉及到序列化的情况:

  1. 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输 — 也就是初始化工作是在Driver端进行的,而程序实际运行是在Executor端进行的; 涉及跨进程通信了,所以要进行序列化
  2. 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
  3. 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

Kryo序列化框架

Kryo 是一个快速高效的Java对象图形序列化框架,主要特点是性能、高效和易用。该项目用来序列化对象到文件、数据库或者网 络。
但是,它也有一个致命的弱点:生成的byte数据中部包含field数据,对类升级的兼容性很差!所以,若用kryo序列化对象用于C/S架构的话,两边的Class结构要保持一致。

Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。

  • Kryo速度是Serializable的10倍。
  • 当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用kryo来序列化
  • 即使使用kryo序列化,也要继承Serializable接口
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/*
* Kryo 序列化 效率是Java序列化的10倍,但不支持全部类型
* */
case class Dog(name:String)
case class Cat(age:Int)
case class Animal(dog: Dog,cat: Cat)

object TestKyroDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("TestKryo")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[Animal], classOf[Cat], classOf[Dog])) 
      //当序列化的类中包含其他引用类型,最好把其他类型也加入进来,没加入进来的类会以全类名方式进行存储
      val spark = SparkSession.builder().config(conf).getOrCreate()
    val fr = spark.createDataFrame(Seq(new Animal(new Dog("he"),new Cat(22))))
    fr.cache().collect()
  }
}

参考链接:
https://blog.csdn.net/qq_43288259/article/details/116749301
https://blog.csdn.net/qq_43192537/article/details/110389236

Spark ShuffeManager 基本介绍

Shuffle 可以简单理解成数从新洗牌的过程。过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段

Shuffle 可以简单理解成数从新洗牌的过程。过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不同,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle

负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。

  1. 在Spark1.2以前,默认的shuffle计算引擎是HashShuffleManager。
  2. 在Spark1.2以后的版本中,默认的ShuffleManager变成SortShuffleManager
  3. SortShuffleManager有两种机制,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于 spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制

HashShuffeManager VS SortShuffeManager

  • HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。
  • SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle-read-task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可

HashShuffleManager运行原理:

  1. 未经优化的HashShuffleManager:

    • shuffle-write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子
      (比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算
      法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。

    • 在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

    • 那么每个执行shuffle-write的task,要为下一个stage创建多少个磁盘文件呢?下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。

      比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。

    • shuffle-read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。

      由于shufflewrite的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffleread的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

  2. 优化后的HashShuffleManager:

    • spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。
    • 开启consolidate机制之后,在shuffle-write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPUcore,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
    • 当Executor的CPU-core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。
    • 因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能

      • 假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。
      • 但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:*CPUcore的数量下一个stage的task数量**。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

SortShuffleManager基本原理

  1. 普通运行机制:
    • 在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。
    • 如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。
    • 接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
    • 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。
    • 写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
    • 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。
    • 此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。

比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

  1. bypass运行机制:

    bypass运行机制的触发条件如下:

    1. shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
    2. 不是聚合类的shuffle算子(比如reduceByKey)。
    • 此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。
    • 当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
    • 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
    • 而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。
    • 也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

spark stages 并行度 shuffle 宽窄依赖 随笔

spark stages 一个任务集对应的调度阶段;每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;Stage分成两种类型ShuffleMapStage、ResultStage。

stage

stage是什么?

Stage:调度阶段

一个任务集对应的调度阶段;每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;Stage分成两种类型ShuffleMapStage、ResultStage。

  • stage是由一组并行的task组成,stage会将一批task用TaskSet来封装,提交给TaskScheduler进行分配,最后发送到Executor执行
  • stage的划分依据就是看是否产生了shuflle(即宽依赖–reduceByKey, groupByKey等算子),遇到一个shuffle操作就划分为前后两 个stage
  • spark job是根据action算子触发的,遇到action算子就会起一个job
  • 同一个Stage内的所有Transformation算子所操作的RDD都是具有相同的Partition数量的

stage划分

关键点是Spark Stage划分依据主要是基于Shuffle

Spark此时就利用了前文提到的依赖关系,调度器从DAG图末端出发,逆向遍历整个依赖关系链(就是从最后一个RDD往前推算),遇到ShuffleDependency(宽依赖关系的一种叫法)就断开,遇到NarrowDependency就将其加入到当前stage。

每个Stage里task的数量由Stage最后一个RDD中的分区数决定。如果Stage要生成Result,则该Stage里的Task都是ResultTask,否则是ShuffleMapTask。

  • ShuffleMapTask的计算结果需要shuffle到下一个Stage,其本质上相当于MapReduce中的mapper
  • ResultTask则相当于MapReduce中的reducer

ShuffleMapTask is a Task to produce a MapStatus (Task[MapStatus]).

ShuffleMapTask is one of the two types of Tasks. When executed, ShuffleMapTask writes the result of executing a serialized task code over the records (of a RDD partition) to the shuffle system and returns a MapStatus (with the BlockManager and estimated size of the result shuffle blocks).

ResultTask[T, U] is a Task that executes a partition processing function on a partition with records (of type T) to produce a result (of type U) that is sent back to the driver.

  1. job的最后一个阶段是由多个ResultTasks组成的,之前的stages由ShuffleMapTasks组成。
  2. ResultTask执行task并将task输出返回给driver Application。
  3. ShuffleMapTask执行task,并将task输出分配给多个bucket(基于task的partitioner个数)。

为什么是从后往前推导?

因为RDD之间是有血缘关系的,后面的RDD依赖前面的RDD,也就是说后面的RDD要等前面的RDD执行完才会执行。 所以从后往前遇到宽依赖就划分为两个stage,shuffle前一个,shuffle后一个。如果整个过程没有产生shuffle那就只会有一个stage。

Stage的调度是由DAG Scheduler完成的。由RDD的有向无环图DAG切分出了Stage的有向无环图DAG

从后往前遍历到最开始执行的Stage执行,如果提交的Stage仍有未完成的父Stage,则Stage需要等待其父Stage执行完才能执行。

spark的作业调度

RDD的操作分为transformation和action两类,真正的作业提交运行发生在action之后,调用action之后会将对原始输入数据的所有transformation操作封装成作业并向集群提交运行:

  1. 由DAGScheduler对RDD之间的依赖性进行分析,通过DAG来分析各个RDD之间的转换依赖关系
  2. 根据DAGScheduler分析得到的RDD依赖关系将Job划分成多个stage
  3. 每个stage会生成一个TaskSet并提交给TaskScheduler,调度权转交给TaskScheduler,由它来负责分发task到worker执行

宽窄依赖

Spark中RDD的粗粒度操作,每一次transformation都会生成一个新的RDD,这样就会建立RDD之间的前后依赖关系,在Spark中,依赖关系被定义为两种类型:宽依赖(Shuffle Dependency)与窄依赖(Narrow Dependency)

  1. 窄依赖,父RDD的分区最多只会被子RDD的一个分区使用
  2. 宽依赖,父RDD的一个分区会被子RDD的多个分区使用(宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作)

区分宽窄依赖,我们主要从父RDD的Partition流向来看:流向单个RDD就是窄依赖,流向多个RDD就是宽依赖。

  1. 对于窄依赖,子rdd一个分区数据丢失只需要对一个父rdd进行重算,重算利用率100%。
  2. 对于宽依赖,子rdd一个分区数据丢失需要多该分区依赖的所有父rdd分区进行重算,重算利用率低。

并行度

并行度(paralleism):在分布式计算框架中,一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正实现多个任务并行执行,记住,这里是并行,而不是并发,这里我们将整个集群并行执行任务的数量,成为并行度。

spark中的并行度和分区之间是有关系的,rdd的每一个分区都是一个task,然后传送到对应的executor中进行计算。如果资源充足(executor core数=task数)并行度就等于分区数,如果(executor core数< task数)就是并发执行。

spark根据分区数来决定task的个数,而task的个数和executor所拥有的core数来决定着spark的并行度,当task数多余core数时,就会产生并发操作

改变并行度(parallelism)

  1. 设置合理的task数量,至少设置成与spark Application (executor)的总cpu core 数量相同。比如:150个分区,150个task,150个core,差不多每个task同时运行完毕。(官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍 ,比如150个cpu core ,基本设置 task数量为 300 ~ 500)
  2. 重新设置RDD的分区数,常见的方法有repartitions 、 coalesce、join、以及一些会产生宽依赖的算子。
  3. 一个stage的并行度由stage的最后一个rdd的分区决定。可以通过spark.default.parallelism可以设置当前stage的并行度

spark shuffle

shuffle是spark中数据重分发的一种机制,以便于在跨分区进行数据的分组。shuffle通常会引起executor
与节点之间的数据复制,这期间会有大量的网络I/O,磁盘I/O和数据的序列化。

  1. 在shuffle内部,单个map tasks的结果被保存在内存中,直到放不下为止。然后,根据目标分区对它们进行
    排序,并将它们写入单个文件。在reduce端,tasks会读取相关的经过排序的数据块。
  2. shuffle还会在磁盘上产生大量的中间文件,这样做是为了当触发重算的时候这些中间文件不用被重新创建。
  3. 垃圾收集可能会发生在很长的一段时间之后,如果应用程序保留了对这些RDD的引用,或者垃圾收集不经常启动的话这
    意味着对于一个运行时长较长的spark作业,它可能会消耗大量的磁盘空间。这些中间文件的存储目录在配置Spark
    Context时由spark.local.dir参数明确指定。