spark stages 一个任务集对应的调度阶段;每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;Stage分成两种类型ShuffleMapStage、ResultStage。
stage
stage是什么?
Stage:调度阶段
一个任务集对应的调度阶段;每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;Stage分成两种类型ShuffleMapStage、ResultStage。
- stage是由一组并行的task组成,stage会将一批task用TaskSet来封装,提交给TaskScheduler进行分配,最后发送到Executor执行
- stage的划分依据就是看是否产生了shuflle(即宽依赖–reduceByKey, groupByKey等算子),遇到一个shuffle操作就划分为前后两 个stage
- spark job是根据action算子触发的,遇到action算子就会起一个job
- 同一个Stage内的所有Transformation算子所操作的RDD都是具有相同的Partition数量的
stage划分
关键点是Spark Stage划分依据主要是基于Shuffle
Spark此时就利用了前文提到的依赖关系,调度器从DAG图末端出发,逆向遍历整个依赖关系链(就是从最后一个RDD往前推算),遇到ShuffleDependency(宽依赖关系的一种叫法)就断开,遇到NarrowDependency就将其加入到当前stage。
每个Stage里task的数量由Stage最后一个RDD中的分区数决定。如果Stage要生成Result,则该Stage里的Task都是ResultTask,否则是ShuffleMapTask。
- ShuffleMapTask的计算结果需要shuffle到下一个Stage,其本质上相当于MapReduce中的mapper
- ResultTask则相当于MapReduce中的reducer
ShuffleMapTask is a Task to produce a MapStatus (Task[MapStatus]).
ShuffleMapTask is one of the two types of Tasks. When executed, ShuffleMapTask writes the result of executing a serialized task code over the records (of a RDD partition) to the shuffle system and returns a MapStatus (with the BlockManager and estimated size of the result shuffle blocks).
ResultTask[T, U] is a Task that executes a partition processing function on a partition with records (of type T) to produce a result (of type U) that is sent back to the driver.
- job的最后一个阶段是由多个ResultTasks组成的,之前的stages由ShuffleMapTasks组成。
- ResultTask执行task并将task输出返回给driver Application。
- ShuffleMapTask执行task,并将task输出分配给多个bucket(基于task的partitioner个数)。
为什么是从后往前推导?
因为RDD之间是有血缘关系的,后面的RDD依赖前面的RDD,也就是说后面的RDD要等前面的RDD执行完才会执行。 所以从后往前遇到宽依赖就划分为两个stage,shuffle前一个,shuffle后一个。如果整个过程没有产生shuffle那就只会有一个stage。
Stage的调度是由DAG Scheduler完成的。由RDD的有向无环图DAG切分出了Stage的有向无环图DAG
从后往前遍历到最开始执行的Stage执行,如果提交的Stage仍有未完成的父Stage,则Stage需要等待其父Stage执行完才能执行。
spark的作业调度
RDD的操作分为transformation和action两类,真正的作业提交运行发生在action之后,调用action之后会将对原始输入数据的所有transformation操作封装成作业并向集群提交运行:
- 由DAGScheduler对RDD之间的依赖性进行分析,通过DAG来分析各个RDD之间的转换依赖关系
- 根据DAGScheduler分析得到的RDD依赖关系将Job划分成多个stage
- 每个stage会生成一个TaskSet并提交给TaskScheduler,调度权转交给TaskScheduler,由它来负责分发task到worker执行
宽窄依赖
Spark中RDD的粗粒度操作,每一次transformation都会生成一个新的RDD,这样就会建立RDD之间的前后依赖关系,在Spark中,依赖关系被定义为两种类型:宽依赖(Shuffle Dependency)与窄依赖(Narrow Dependency)
- 窄依赖,父RDD的分区最多只会被子RDD的一个分区使用
- 宽依赖,父RDD的一个分区会被子RDD的多个分区使用(宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作)
区分宽窄依赖,我们主要从父RDD的Partition流向来看:流向单个RDD就是窄依赖,流向多个RDD就是宽依赖。
- 对于窄依赖,子rdd一个分区数据丢失只需要对一个父rdd进行重算,重算利用率100%。
- 对于宽依赖,子rdd一个分区数据丢失需要多该分区依赖的所有父rdd分区进行重算,重算利用率低。
并行度
并行度(paralleism):在分布式计算框架中,一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正实现多个任务并行执行,记住,这里是并行,而不是并发,这里我们将整个集群并行执行任务的数量,成为并行度。
spark中的并行度和分区之间是有关系的,rdd的每一个分区都是一个task,然后传送到对应的executor中进行计算。如果资源充足(executor core数=task数)并行度就等于分区数,如果(executor core数< task数)就是并发执行。
spark根据分区数来决定task的个数,而task的个数和executor所拥有的core数来决定着spark的并行度,当task数多余core数时,就会产生并发操作
改变并行度(parallelism)
- 设置合理的task数量,至少设置成与spark Application (executor)的总cpu core 数量相同。比如:150个分区,150个task,150个core,差不多每个task同时运行完毕。(官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍 ,比如150个cpu core ,基本设置 task数量为 300 ~ 500)
- 重新设置RDD的分区数,常见的方法有repartitions 、 coalesce、join、以及一些会产生宽依赖的算子。
- 一个stage的并行度由stage的最后一个rdd的分区决定。可以通过spark.default.parallelism可以设置当前stage的并行度
spark shuffle
shuffle是spark中数据重分发的一种机制,以便于在跨分区进行数据的分组。shuffle通常会引起executor
与节点之间的数据复制,这期间会有大量的网络I/O,磁盘I/O和数据的序列化。
- 在shuffle内部,单个map tasks的结果被保存在内存中,直到放不下为止。然后,根据目标分区对它们进行
排序,并将它们写入单个文件。在reduce端,tasks会读取相关的经过排序的数据块。
- shuffle还会在磁盘上产生大量的中间文件,这样做是为了当触发重算的时候这些中间文件不用被重新创建。
- 垃圾收集可能会发生在很长的一段时间之后,如果应用程序保留了对这些RDD的引用,或者垃圾收集不经常启动的话这
意味着对于一个运行时长较长的spark作业,它可能会消耗大量的磁盘空间。这些中间文件的存储目录在配置Spark
Context时由spark.local.dir参数明确指定。