spark stages 并行度 shuffle 宽窄依赖 随笔

spark stages 一个任务集对应的调度阶段;每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;Stage分成两种类型ShuffleMapStage、ResultStage。

stage

stage是什么?

Stage:调度阶段

一个任务集对应的调度阶段;每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段;Stage分成两种类型ShuffleMapStage、ResultStage。

  • stage是由一组并行的task组成,stage会将一批task用TaskSet来封装,提交给TaskScheduler进行分配,最后发送到Executor执行
  • stage的划分依据就是看是否产生了shuflle(即宽依赖–reduceByKey, groupByKey等算子),遇到一个shuffle操作就划分为前后两 个stage
  • spark job是根据action算子触发的,遇到action算子就会起一个job
  • 同一个Stage内的所有Transformation算子所操作的RDD都是具有相同的Partition数量的

stage划分

关键点是Spark Stage划分依据主要是基于Shuffle

Spark此时就利用了前文提到的依赖关系,调度器从DAG图末端出发,逆向遍历整个依赖关系链(就是从最后一个RDD往前推算),遇到ShuffleDependency(宽依赖关系的一种叫法)就断开,遇到NarrowDependency就将其加入到当前stage。

每个Stage里task的数量由Stage最后一个RDD中的分区数决定。如果Stage要生成Result,则该Stage里的Task都是ResultTask,否则是ShuffleMapTask。

  • ShuffleMapTask的计算结果需要shuffle到下一个Stage,其本质上相当于MapReduce中的mapper
  • ResultTask则相当于MapReduce中的reducer

ShuffleMapTask is a Task to produce a MapStatus (Task[MapStatus]).

ShuffleMapTask is one of the two types of Tasks. When executed, ShuffleMapTask writes the result of executing a serialized task code over the records (of a RDD partition) to the shuffle system and returns a MapStatus (with the BlockManager and estimated size of the result shuffle blocks).

ResultTask[T, U] is a Task that executes a partition processing function on a partition with records (of type T) to produce a result (of type U) that is sent back to the driver.

  1. job的最后一个阶段是由多个ResultTasks组成的,之前的stages由ShuffleMapTasks组成。
  2. ResultTask执行task并将task输出返回给driver Application。
  3. ShuffleMapTask执行task,并将task输出分配给多个bucket(基于task的partitioner个数)。

为什么是从后往前推导?

因为RDD之间是有血缘关系的,后面的RDD依赖前面的RDD,也就是说后面的RDD要等前面的RDD执行完才会执行。 所以从后往前遇到宽依赖就划分为两个stage,shuffle前一个,shuffle后一个。如果整个过程没有产生shuffle那就只会有一个stage。

Stage的调度是由DAG Scheduler完成的。由RDD的有向无环图DAG切分出了Stage的有向无环图DAG

从后往前遍历到最开始执行的Stage执行,如果提交的Stage仍有未完成的父Stage,则Stage需要等待其父Stage执行完才能执行。

spark的作业调度

RDD的操作分为transformation和action两类,真正的作业提交运行发生在action之后,调用action之后会将对原始输入数据的所有transformation操作封装成作业并向集群提交运行:

  1. 由DAGScheduler对RDD之间的依赖性进行分析,通过DAG来分析各个RDD之间的转换依赖关系
  2. 根据DAGScheduler分析得到的RDD依赖关系将Job划分成多个stage
  3. 每个stage会生成一个TaskSet并提交给TaskScheduler,调度权转交给TaskScheduler,由它来负责分发task到worker执行

宽窄依赖

Spark中RDD的粗粒度操作,每一次transformation都会生成一个新的RDD,这样就会建立RDD之间的前后依赖关系,在Spark中,依赖关系被定义为两种类型:宽依赖(Shuffle Dependency)与窄依赖(Narrow Dependency)

  1. 窄依赖,父RDD的分区最多只会被子RDD的一个分区使用
  2. 宽依赖,父RDD的一个分区会被子RDD的多个分区使用(宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作)

区分宽窄依赖,我们主要从父RDD的Partition流向来看:流向单个RDD就是窄依赖,流向多个RDD就是宽依赖。

  1. 对于窄依赖,子rdd一个分区数据丢失只需要对一个父rdd进行重算,重算利用率100%。
  2. 对于宽依赖,子rdd一个分区数据丢失需要多该分区依赖的所有父rdd分区进行重算,重算利用率低。

并行度

并行度(paralleism):在分布式计算框架中,一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正实现多个任务并行执行,记住,这里是并行,而不是并发,这里我们将整个集群并行执行任务的数量,成为并行度。

spark中的并行度和分区之间是有关系的,rdd的每一个分区都是一个task,然后传送到对应的executor中进行计算。如果资源充足(executor core数=task数)并行度就等于分区数,如果(executor core数< task数)就是并发执行。

spark根据分区数来决定task的个数,而task的个数和executor所拥有的core数来决定着spark的并行度,当task数多余core数时,就会产生并发操作

改变并行度(parallelism)

  1. 设置合理的task数量,至少设置成与spark Application (executor)的总cpu core 数量相同。比如:150个分区,150个task,150个core,差不多每个task同时运行完毕。(官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍 ,比如150个cpu core ,基本设置 task数量为 300 ~ 500)
  2. 重新设置RDD的分区数,常见的方法有repartitions 、 coalesce、join、以及一些会产生宽依赖的算子。
  3. 一个stage的并行度由stage的最后一个rdd的分区决定。可以通过spark.default.parallelism可以设置当前stage的并行度

spark shuffle

shuffle是spark中数据重分发的一种机制,以便于在跨分区进行数据的分组。shuffle通常会引起executor
与节点之间的数据复制,这期间会有大量的网络I/O,磁盘I/O和数据的序列化。

  1. 在shuffle内部,单个map tasks的结果被保存在内存中,直到放不下为止。然后,根据目标分区对它们进行
    排序,并将它们写入单个文件。在reduce端,tasks会读取相关的经过排序的数据块。
  2. shuffle还会在磁盘上产生大量的中间文件,这样做是为了当触发重算的时候这些中间文件不用被重新创建。
  3. 垃圾收集可能会发生在很长的一段时间之后,如果应用程序保留了对这些RDD的引用,或者垃圾收集不经常启动的话这
    意味着对于一个运行时长较长的spark作业,它可能会消耗大量的磁盘空间。这些中间文件的存储目录在配置Spark
    Context时由spark.local.dir参数明确指定。

spark exactly once 语义随笔

Spark具备很好的机制来保证exactly once的语义,具体体现在数据源的可重放性、计算过程中的容错性、以及写入存储介质时的幂等性或者事务性。exactly once指的是在处理数据的过程中,系统有很好的容错性(fault-tolerance),能够保证数据处理不重不丢,每一条数据仅被处理一次。

语言所蕴含的意义就是语义(semantic)。简单的说,符号是语言的载体。符号本身没有任何意义,只有被赋予含义的符号才能够被使用,这时候语言就转化为了信息,而语言的含义就是语义。
语义可以简单地看作是数据所对应的现实世界中的事物所代表的概念的含义,以及这些含义之间的关系,是数据在某个领域上的解释和逻辑表示。

一、三种语义

spark exactly once 语义是spark很重要的一个概念。分布式系统中都基本上会有三种关键的语义

  • at most once:至多一次,数据可能会丢,但不会重复
  • at least once:至少一次,数据肯定不会丢失,但可能重复
  • exactly once:有且只有一次,数据不丢失不重复,且只出现一次; 该语义是最理想的,但也难以实现

Spark具备很好的机制来保证exactly once的语义,具体体现在数据源的可重放性、计算过程中的容错性、以及写入存储介质时的幂等性或者事务性。exactly once指的是在处理数据的过程中,系统有很好的容错性(fault-tolerance),能够保证数据处理不重不丢,每一条数据仅被处理一次。

一个典型的 Spark Streaming 应用程序会包含三个处理阶段:接收数据、处理汇总、输出结果。每个阶段都需要做不同的处理才能实现相应的语义

二、接收数据 – 数据源的可重放性

取决于上游数据源的特性

  • HDFS 这类支持容错的文件系统中读取文件,能够直接支持 Exactly-once 语义。
  • 上游消息系统支持 ACK(如RabbitMQ),我们就可以结合 Spark 的 Write Ahead Log 特性来实现 At-least-once 语义。
  • 非可靠的数据接收器(如 socketTextStream),当 Worker 或 Driver 节点发生故障时就会产生数据丢失,提供的语义也是未知的。
  • Kafka 消息系统是基于偏移量(Offset)的,它的 Direct API 可以提供 Exactly-once 语义

SparkStreaming接入Kafka的数据有两种模式

一种为Receiver模式,一种为Direct模式。

Receiver模式

Receiver模式都不能够实现exactly once的语义,其根本原因是Kafka自己管理的offset与SparkStreaming实际处理数据的offset没有同步导致的。

Receiver模式采用Kafka的高阶consumer API,Kafka自己封装了对数据的获取逻辑,且通过Zookeeper管理offset信息,这种模式在与SparkStreaming对接时,有以下特点:

  1. Kafka中的partition数量与SparkStreaming中的并行度不是一一对应的,SparkStreaming通过创建Receiver去读取Kafka中数据,createStream()方法传入的并发参数代表的是读取Kafka中topic+partition的线程数,并不能提高SparkStreaming读取数据的并行度。
  2. Kafka自己管理offset,Receiver作为一个高层的Consumer来消费数据,其消费的偏移量(offset)由Kafka记录在Zookeeper中,一旦出现错误,那些已经标记为消费过的数据将会丢失。
  3. Receiver模式下,为了解决读取数据时的并行度问题,可以创建多个DStream,然后union起来
  4. 为了解决数据丢失的问题,可以选择开启Spark的WAL(write ahead log)机制,每次处理数据前将预写日志写入到HDFS中,如果节点出现错误,可以从WAL中恢复。这种方法其实效率低下,不仅数据冗余(Kafka中有副本机制,Spark中还要存一份),且无法保证exactly once,数据可能重复消费。

Direct模式

Spark1.3中引入了Direct模式来替代Receiver模式,它使用Kafka的Simple consumer API,由Spark应用自己管理offset信息,以达成exactly once的语义,其特点如下:

  1. Kafka中的partition与SparkStreaming中的partition一一对应,也就是SparkStreaming读取数据的并行度取决于Kafka中partition的数量。
  2. 不依赖Receiver,而是通过低阶api直接找到topic+partition的leader获取数据,并由SparkStreaming应用自己负责追踪维护消费的offset。
  3. Direct模式下,SparkStreaming应用管理offset的方法案例,其中offset依然是存放在zookeeper中,但是由应用自身来管理的,offset也可以放在Redis、MySQL、HBase中进行管理,根据具体情况进行选择。

转化或汇总 – 计算过程中的容错性

Spark RDD 本身就是一种具备容错性、不变性、以及计算确定性的数据结构。只要数据来源是可用的,且处理过程中没有副作用(Side effect),我们就能一直得到相同的计算结果;撇去Driver与Executor的高可用性不说,Spark应用内部则采用checkpoint和lineage的机制来确保容错性。

lineage

一般翻译为血统,简单来说就是RDD在转化的过程中,由于父RDD与子RDD存在依赖关系(Dependency),从而形成的lineage,也可以理解为lineage串起了RDD DAG。

RDD可以进行缓存,通过调用persist或者cache方法,将RDD持久化到内存或者磁盘中,这样缓存的RDD就可以被保留在计算节点的内存中被重用,缓存是构建Spark快速迭代的关键。

当一个RDD丢失的情况下,Spark会去寻找它的父RDD是否已经缓存,如果已经缓存,就可以通过父RDD直接算出当前的RDD,从而避免了缓存之前的RDD的计算过程,且只有丢失数据的partition需要进行重算,这样Spark就避免了RDD上的重复计算,能够极大的提升计算速度。

缓存虽然可以提升Spark快速迭代计算的速度,但是缓存是会丢失的。

checkpoint

检查点机制就是为了可以切断lineage的依赖关系,在某个重要的节点,将RDD持久化到文件系统中(一般选择HDFS),这样就算之前的缓存已经丢失了,也可以保证检查点数据不会丢失,这样在恢复的时候,会直接从检查点的数据开始进行计算,检查点机制在SparkStreaming这种流式计算中发挥的作用会更大。

可以通过以下源码为入口进一步了解Spark的缓存和检查点机制,RDD在进行计算的时候会调用其iterator方法,在该方法中会首先去读取缓存的数据,如果没有缓存的数据则会去读取checkpoint的数据

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

Spark在计算过程中采用的lineage和checkpoint机制相互结合,取长补短,再加上Spark各个组件底层本身就是具有高可用性,所以在Spark应用在转化计算的过程中,可是保证数据处理的exactly once。

结果输出 – 写入存储介质的幂等性或事务性

幂等更新

  • 多次写入会产生相同的结果数据,我们可以认为这类写入操作是幂等的;saveAsTextFile 就是一种典型的幂等写入
  • 幂等写入只适用于 Map-only 型的计算流程,即没有 Shuffle、Reduce、Repartition 等操作
  • 对 Kafka DStream 做一些额外设置:
    1. 将 enable.auto.commit 设置为 false。默认情况下,Kafka DStream 会在接收到数据后立刻更新自己的偏移量,我们需要将这个动作推迟到计算完成之后。
    2. 打开 Spark Streaming 的 Checkpoint 特性,用于存放 Kafka 偏移量。但若应用程序代码发生变化,Checkpoint 数据也将无法使用,这就需要改用在数据输出之后手动提交 Kafka 偏移量。HasOffsetRanges 类,以及 commitAsync API 可以做到这一点
      messages.foreachRDD { rdd =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition { iter =>
          // output to database
        }
        messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }

事务更新

  • 务型写入时,我们需要生成一个唯一 ID,这个 ID 可以使用当前批次的时间、分区号、或是 Kafka 偏移量来生成
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}
  • 通常会在 foreachPartition 方法中来执行数据库写入操作。对于 Map-only 流程来说是适用的
  • 这种流程下 Kafka 分区和 RDD 分区是一一对应的,我们可以用以下方式获取各分区的偏移量:
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val offsetRange = offsetRanges(TaskContext.get.partitionId)
  }
}
  • 对于包含 Shuffle 的计算流程(如上文的错误日志统计),我们需要先将处理结果拉取到 Driver 进程中,然后才能执行事务操作;如果偏移量写入失败,或者重复处理了某一部分数据(offset != $fromOffset 判断条件不通过),该事务就会回滚,从而做到 Exactly-once。
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val result = processLogs(rdd).collect() // parse log and count error
  DB.localTx { implicit session =>
    result.foreach { case (time, count) =>
      // save to error_log table
    }
    offsetRanges.foreach { offsetRange =>
      val affectedRows = sql"""
      update kafka_offset set offset = ${offsetRange.untilOffset}
      where topic = ${topic} and `partition` = ${offsetRange.partition}
      and offset = ${offsetRange.fromOffset}
      """.update.apply()

      if (affectedRows != 1) {
        throw new Exception("fail to update offset")
      }
    }
  }
}

exactly once固然是个理想的状态,但其实现成本也是非常高的,在对数据可靠性要求不是很高的场景中,at-least-once甚至丢失少量数据也是可以作为一个选项考虑的

总结

官方已经不再推荐使用Receiver模式

Receiver接收方式

  • 多个Receiver接受数据效率高,但有丢失数据的风险
  • 开启日志(WAL)可防止数据丢失,但写两遍数据效率低。
  • Zookeeper维护offset有重复消费数据可能。
  • 使用高层次的API

Direct直连方式

  • 不使用Receiver,直接到kafka分区中读取数据
  • 不使用日志(WAL)机制
  • Spark自己维护offset
  • 使用低层次的API

hive compile 源码编译学习随笔

编译的源码来源于什么地方?如何知道互相兼容的版本号?用什么工具编译?和辅助的工具有哪些?编译有bug时如何快速定位到问题? 如果解决问题;我们第一点的基本思路是: ”先明确hive的版本,同时希望这个版本的hive去兼容某个版本的Hadoop“

1、一些问题

  • 编译的源码来源于什么地方?
  • 如何知道互相兼容的版本号?
  • 用什么工具编译?和辅助的工具有哪些
  • 编译有bug时如何快速定位到问题? 如果解决问题

2、编译的版本

Hadoop: 3.1.3
hive : 3.1.3
spark: 3.1.3

* 为什么都选择3.1.3,在hive 的官方地址,是有如下图说明的,那我们就默认hive3.1.3和hadoop3.1.3是完全兼容的(实际上是有bug的);以这两个版本的Hadoop和hive搭建的集群在启动hive的时候会报兼容性错误。

hive文档说明

实际上,我们要编译的组件一般不会是hadoop ,多数情况是hive,spark等组件,主要是因为这些组件如果基于Hadoop生态的运行的话,那都是这些组件去兼容Hadoop,故我们这里是主要编译hive。

也就是我们第一点的基本思路是: ”先明确hive的版本,同时希望这个版本的hive去兼容某个版本的Hadoop

3、寻找源码

源码基本上都是去GitHub上的maven仓库里找

地址:https://github.com/apache/hive 或者 https://gitee.com/apache/hive

此地址看到的hive的master分支,我们可以点击master的下拉框,然后输入我们的版本号,注意我们要编译的是具体的版本不是分支,所以要搜索tags,搜索后如下图。这个时候其实主要是搜一下看看有没有我们要编译的版本以及该版本下对应的其它组件版本号

切换到该版本后,找pom.xml文件,点击打开;我们可以直接在打开的页面搜索 hadoop ,多找一下就会发现hive3.1.3对应的Hadoop版本其实是 <hadoop.version>3.1.0</hadoop.version>,也就是我们之前所说的,如果把hive3.1.3放在hadoop3.1.3上搭建环境,大概率会出现问题的。

github选择hive3.1.3

4、配置环境

  • MacBook Pro (13-inch, M1, 2020), macOS Monterey
  • java: jdk1.8.0_212
  • maven: Apache Maven 3.8.6
  • IntelliJ IDEA 2022.2.2 (Community Edition)
  • git version 2.37.3

4.1 idea git 插件下载源码

  • idea 打开后 通过 Get from CVS的方式,填入github上hive的地址
  • 下载后在idea的右下角master的地方点一下,然后点击 。 checkout tag or version,输入3.1.3我们需要的版本点击确定。则branch就切换到该版本了。
  • 无论是 master分支还是我们想要切换的分支遇到提示 。 reload maven projects 的弹出框都点击yes
  • 此时刷新maven插件,基本上大概率是会报错的。不管是 pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde 和ldap-client-api:pom的哪一个错误,我们去搜索资料都能找到一些解决方案,但是基本上都是不可用的。
  • 为什么说报错在此篇文章书写时没法解决,主要是因为maven的问题,maven此时是最新版,而且从3.8.1开始maven block掉了所有HTTP协议的repositories,所以针对前边提到的各种错误,不管搜到什么答案,大概率都是没法有效解决的。笔者的处理方案是手动下载一个maven的tar包,配置一个maven3.6.3的版本,也就是次新的大版本
  • 刷新maven插件,基本上不会报错了;在Terminal输入 。 mvn clean package -Pdist -DskipTests -Dmaven.javadoc.skip=true,回车编译来验证当前的3.1.3版本是否有问题。

4.2 遇到的错误

Could not find artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde in nexus-aliyun
网上多数说是增加阿里云的spring- plugin的依赖,但是实际上没有效果,主要是此时的spring- plugin依赖里也没有这个jar包,需要增加spring自身的仓库支持
https://www.codeleading.com/article/91364854522/
5.1.5-jhyde
Failure to find org.glassfish:javax.el:pom:
此问题可以在此处不关注它,
glassfish:javax.el:pom
idea maven 一直报错“Could not transfer artifact
也可以不关注此问题,但是有专门的此问题场景解决方案
https://blog.51cto.com/u_15162069/2804511
Could not transfer artifact
Blocked mirror for repositories
最新版本的maven block掉了所有HTTP协议的repositories,仅支持https;maven次新大版本是3.6.3,是不会出现这个问题的。所以手动降版本,这也一下解决好几个问题
https://www.jianshu.com/p/274a45a2db05https://blog.csdn.net/qq_41980563/article/details/122061818
Blocked mirror for repositories

5、修改Hadoop版本号及处理报错

  • 视频教程里 : https://www.bilibili.com/video/BV1x14y177Ab?p=7&vd_source=b2e82cf2c96f0ecd7432e09d95d7d8ec 直接在集群中调用hive触发里问题,从而快速的找到了主要兼容的jar包,但是一般我们习惯性直接将hive源码的pom文件hadoop设置成我们希望的版本 3.1.3,直接编译,笔者尝试过,直接改 Hadoop3.1.2 这样编译时不出现问题。
  • 笔者并没有完全直接去修改主要的兼容报错的jar包, 在编译时遇到了各种报错,笔者没有采用视频摘樱桃的方式,而是逐一来修改涉及到的代码
  • java的某个类或者方法开始计算停用的时候会在官方API文档中开始做上 。 Deprecated 标记,有这个单词的方法说明官方在通知各位这个方法要做好放弃的准备了,它一般也同时给出解决方案和停用大概时间

6、修改的java文件及主要问题

  • 组件版本升级时涉及大到代码适配的工作,主要就是针对代码中的错误,去定位对应包中类的方法是在什么时候做出来了改变,而官方的API文档中都会有相应的痕迹,耐心的去文档里大版本的对应着找一找就可以找到了。(所谓大版本,就是类似于1.0,2.0,而中间的1.01,1.02 这些可以暂时跳过,除非某两个大版本相应的方法直接发生了变化,这时需要细化中间的小版本来看一下具体的情况,但一般都是大版本发布某方法要被弃用了,替换了的声明)
  • 有时候,你不需要去懂怎么回事,只要找到关键位置,照葫芦画瓢的做就好了
  • 其实要修改的还有很多地方,因为hive有很多的 module,每个module都可能指定不同的版本,而只有在某些特定场运行时的场景下才可能触发bug,但是现在编译确实是能通过。其它具体的问题就得去进一步慢慢验证修复
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.6.1:compile (default-compile) on project hive-druid-handler: Compilation failure[ERROR] /Users/ethan/IdeaProjects/hive/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java:[46,61] <T>emptyIterator()在com.google.commn.collect.Iterators中不是公共的; 无法从外部程序包中对其进行访问
import java.util.Collections;// 替换了如下的代码// private Iterator<List<Object>> compactedValues = Iterators.emptyIterator(); private Iterator<List<Object>> compactedValues = Collections.emptyIterator();
此问题找解决方法的过程是视频中说的方式,去API文档找报错的方法是在什么时候开始过渡停用的。整个修改的思路是差不多的,单独列出来这个是除了这个类,其它的修改基本上都是一样的方法,一样的处理方式
Iterators中不是公共的
git status刷新索引: 100% (17020/17020), 完成.位于分支 my-hive-3.1.3尚未暂存以备提交的变更: (使用 “git add <文件>…” 更新要提交的内容) (使用 “git restore <文件>…” 丢弃工作区的改动) 修改: druid-handler/pom.xml 修改: druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java 修改: llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java 修改: llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java 修改: llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java 修改: llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java 修改: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java 修改: pom.xml 修改: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java 修改: ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
以上是所有修改的地方,pom文件的修改主要是修改guava的版本号
修改guava的版本号

7、替换spark

先修改pom文件,替换spark的版本,spark3.x 对应的Scala版本最低是2.12,所以同时把Scala的版本也修改掉,可能有读者会问,怎么知道Scala也得变的,不要etc,自己去官方文档看一下基本的要求就知道了。

<spark.version>3.1.3</spark.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.10</scala.version><SPARK_SCALA_VERSION>2.12</SPARK_SCALA_VERSION>

然后继续在Terminal通过命令: mvn clean package -Pdist -DskipTests -Dmaven.javadoc.skip=true, 来从新编译,笔者遇到了大致三个问题

7.1 SparkCounter 中累加器的修改

这里主要是设计到spark自定义累加器实现的继承类替换问题。具体远离参考 spark 累加器学习随笔

7.2 ShuffleWriteMetrics.java 报错

metrics.shuffleWriteMetrics().ShuffleBytesWritten()不存在,从名称看类似的方法为bytesWritten()metrics.shuffleWriteMetrics().ShuffleWriteTime()同样不存在,修改为writeTime()
public ShuffleWriteMetrics(TaskMetrics metrics) { // metrics.shuffleWriteMetrics().ShuffleBytesWritten()不存在,从名称看类似的方法为bytesWritten() // metrics.shuffleWriteMetrics().ShuffleWriteTime()同样不存在,修改为writeTime() this(metrics.shuffleWriteMetrics().bytesWritten(), metrics.shuffleWriteMetrics().writeTime()); }
ShuffleBytesWritten()不存在

7.3 找不到spark_project

编译报错提示org.spark_project.guava····· 不存在,找到相关报错的java文件,修改import org.spark_project.guava.collect.Sets;为 import org.sparkproject.guava.collect.Sets;。保存后,重新安装依赖