MapReduce 基础 工作原理

mapreduce 是什么?

MapReduce是Hadoop的核心之一,可以分成Map和Reduce两部分理解。

1.Map:映射过程,把一组数据按照某种Map函数映射成新的数据。我们将这句话拆分提炼出重要信息,也就是说,map主要是:映射、变换、过滤的过程。一条数据进入map会被处理成多条数据,也就是1进N出。

2.Reduce:归纳过程,把若干组映射结果进行汇总并输出。我们同样将重要信息提炼,得到reduce主要是:分解、缩小、归纳的过程。一组数据进入reduce会被归纳为一组数据(或者多组数据),也就是一组进N出。

graph LR
    A[输入数据集] --map--> B[中间结果数据集]
    B--reduce-->C[最终结果数据集]

mapreduce的基本工作流程

分片、格式化数据源

输入 Map 阶段的数据源,必须经过分片和格式化操作。

分片操作:

指的是将源文件划分为大小相等的小数据块( Hadoop 2.x 中默认 128MB ),也就是分片( split ),
Hadoop 会为每一个分片构建一个 Map 任务,并由该任务运行自定义的 map() 函数,从而处理分片里的每一条记录;

格式化操作:

将划分好的分片( split )格式化为键值对形式的数据,其中, key 代表偏移量, value 代表每一行内容。

执行 MapTask

每个 Map 任务都有一个内存缓冲区(缓冲区大小 100MB ),输入的分片( split )数据经过 Map 任务处理后的中间结果会写入内存缓冲区中。
如果写人的数据达到内存缓冲的阈值( 80MB ),会启动一个线程将内存中的溢出数据写入磁盘,同时不影响 Map 中间结果继续写入缓冲区。
在溢写过程中, MapReduce 框架会对 key进行排序,如果中间结果比较大,会形成多个溢写文件,最后的缓冲区数据也会全部溢写入磁盘形成一个溢写文件,如果是多个溢写文件,则最后合并所有的溢写文件为一个文件。

执行 Shuffle 过程

MapReduce 工作过程中, Map 阶段处理的数据如何传递给 Reduce 阶段,这是 MapReduce 框架中关键的一个过程,这个过程叫作 Shuffle 。
Shuffle 会将 MapTask 输出的处理结果数据分发给 ReduceTask ,并在分发的过程中,对数据按 key 进行分区和排序。

执行 ReduceTask

输入 ReduceTask 的数据流是形式,用户可以自定义 reduce()方法进行逻辑处理,最终以的形式输出。

写入文件

MapReduce 框架会自动把 ReduceTask 生成的传入 OutputFormat 的 write 方法,实现文件的写入操作。
graph LR
  subgraph 整体流程
    direction TB
    A1[分片 格式化] --> A2[执行Map Task]
    A2-->A3[执行Shuffle]
    A3-->A4[执行Reduce Task]
    A4-->A5[落盘 写入文件]
  end
  整体流程 --> B[分片 格式化]
  整体流程 --> C[执行Map Task]
  整体流程-->D[执行Shuffle]
  整体流程-->E[执行Reduce Task]
  整体流程-->F[落盘 写入文件]
  B-->B1(分片 128M =>MapTask.map)
  B-->B2(格式化 split=>key,value) -->B3(exg: wordcount key 偏移量  value 每一行的内容)
  C-->C1(1 每个 Map 任务都有一个内存缓冲区 100MB )
  C-->C2(2 map处理后的结果写入内存缓存区)
  C-->C3(3 内存缓存区达到80% key排序溢写到文件)
  C-->C4(4 多个溢写文件合并)
  D-->D1(maptask=>reducetask 分发和排序)
  E-->E1(key, value-list => key value)
  F-->F1(key,value => OutputFormat.write)

MapTask 详解

  1. Read 阶段: MapTask 通过用户编写的 RecordReader ,从输入的 InputSplit 中解析出一个个 key / value 。

  2. Map 阶段:将解析出的 key / value 交给用户编写的 Map ()函数处理,并产生一系列新的 key / value 。

  3. Collect 阶段:在用户编写的 map() 函数中,数据处理完成后,一般会调用 outputCollector.collect() 输出结果,在该函数内部,它会将生成的 key / value 分片(通过调用 partitioner ),并写入一个环形内存缓冲区中(该缓冲区默认大小是 100MB )。

  4. Spill 阶段:即“溢写”,当缓冲区快要溢出时(默认达到缓冲区大小的 80 %),会在本地文件系统创建一个溢出文件,将该缓冲区的数据写入这个文件。

    将数据写入本地磁盘前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
    写入磁盘之前,线程会根据 ReduceTask 的数量,将数据分区,一个 Reduce 任务对应一个分区的数据。
    这样做的目的是为了避免有些 Reduce 任务分配到大量数据,而有些 Reduce 任务分到很少的数据,甚至没有分到数据的尴尬局面。
    如果此时设置了 Combiner ,将排序后的结果进行 Combine 操作,这样做的目的是尽可能少地执行数据写入磁盘的操作。
  5. Combine 阶段:当所有数据处理完成以后, MapTask 会对所有临时文件进行一次合并,以确保最终只会生成一个数据文件

    合并的过程中会不断地进行排序和 Combine 操作,
    其目的有两个:一是尽量减少每次写人磁盘的数据量;二是尽量减少下一复制阶段网络传输的数据量。
    最后合并成了一个已分区且已排序的文件。

ReduceTask 详解

  1. Copy 阶段: Reduce 会从各个 MapTask 上远程复制一片数据(每个 MapTask 传来的数据都是有序的),并针对某一片数据,如果其大小超过一定國值,则写到磁盘上,否则直接放到内存中

  2. Merge 阶段:在远程复制数据的同时, ReduceTask 会启动两个后台线程,分别对内存和磁盘上的文件进行合并,以防止内存使用过多或者磁盘文件过多。

  3. Sort 阶段:用户编写 reduce() 方法输入数据是按 key 进行聚集的一组数据。

    为了将 key 相同的数据聚在一起, Hadoop 采用了基于排序的策略。
    由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此, ReduceTask 只需对所有数据进行一次归并排序即可。
  4. Reduce 阶段:对排序后的键值对调用 reduce() 方法,键相等的键值对调用一次 reduce()方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到 HDFS 中

  5. Write 阶段: reduce() 函数将计算结果写到 HDFS 上。

    合并的过程中会产生许多的中间文件(写入磁盘了),但 MapReduce 会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到 Reduce 函数。

https://blog.csdn.net/weixin_43542605/article/details/122288056

Hadoop HA 基本认知

Hadoop HA(High Available)经过同时配置两个处于Active/Passive模式的Namenode,分别叫Active Namenode和Standby Namenode。 Standby Namenode做为热备份,从而容许在机器发生故障时可以快速进行故障转移,同时在平常维护的时候使用优雅的方式进行Namenode切换。Namenode只能配置一主一备,不能多于两个Namenode。

Hadoop HA基本流程

集群总体上可以分为三部分:NameNode集群、JournalNode集群和Zookeeper集群。NameNode在某一时刻只有一个处于活跃状态,其他的都处于standby状态;JournalNode负责把edits文件传到standby的NameNode上;Zookeeper负责监控NameNode宕机情况,ZKFC(ZookeeperFailoverController)是专门监控NameNode健康的。

为了同步NameNode的元数据一致,有专门的JournalNode来同步元数据文件,活跃的NameNode的edits文件会写入journalnode集群,其他standby的结点会去读取journalnode上的edits文件,以此来同步自身的元数据。

  1. ZKFC的HealthyMonitor是监控NameNode的进程,是专门监控NameNode将康情况的进程。
  2. HealthyMonitor会定时想ZKFC进程报告NameNode情况。
  3. 当HealthyMonitor出现汇报了NameNode,ZKFC就会向AcitveStandbyEloctor报告。
  4. AcitveStandbyEloctor接到NameNode宕机报告就会通知zk集群选举出新的NameNode。
  5. zk集群经过内部选举,返回一个standby的NameNode给AcitveStandbyEloctor。
  6. AcitveStandbyEloctor想ZKFC报告选举结果。
  7. ZKFC为了防止是网络原因导致NameNode假死,就会结束NameNode进程。
  8. zk集群就会通知另一个ZKFC要求它修改它监控的NameNode的进程为活跃节点。

HA技术关键点

HA问题中需要解决的两个问题:

  • 元数据一致性:Standby节点和Active节点的元数据一致性。
  • 主备自动切换:Active节点服务中断时,Standby节点可以立即启动对外提供服务。

为了确保故障转移可以快速完成,Standby Namenode须要维护最新的Block位置信息,即每一个Block副本存放在集群中的哪些节点上。为了达到这一点,Datanode同时配置主备两个Namenode,并同时发送Block报告和心跳到两台Namenode。

确保任什么时候刻只有一个Namenode处于Active状态很是重要,不然可能出现数据丢失或者数据损坏。当两台Namenode都认为本身的Active Namenode时,会同时尝试写入数据(不会再去检测和同步数据)。为了防止这种脑裂现象,Journal Nodes只容许一个Namenode写入数据,内部经过维护epoch数来控制,从而安全地进行故障转移。

HA其本质上就是要保证主备NN元数据是保持一致的,即保证fsimage和editlog在备NN上也是完整的。元数据的同步很大程度取决于EditLog的同步,而这步骤的关键就是共享文件系统

有两种方式能够进行edit log共享:

  1. 使用QJM(Quorum Journal Manager)共享edit log
  2. 使用NFS(Network File System)共享edit log(存储在NAS/SAN)

NFS的方式

all name space edits logged to shared storage;Block reports are sent to both name nodes

显然NFS做为主备Namenode的共享存储。这种方案可能会出现脑裂(split-brain),即两个节点都认为本身是主Namenode并尝试向edit log写入数据,这可能会致使数据损坏。经过配置fencin脚原本解决这个问题,fencing脚本用于:

  • 将以前的Namenode关机
  • 禁止以前的Namenode继续访问共享的edit log文件

使用这种方案,管理员就能够手工触发Namenode切换,而后进行升级维护。但这种方式存在如下问题:

  • 只能手动进行故障转移,每次故障都要求管理员采起措施切换。
  • NAS/SAN设置部署复杂,容易出错,且NAS自己是单点故障。
  • Fencing 很复杂,常常会配置错误。
  • 没法解决意外(unplanned)事故,如硬件或者软件故障。oop
    所以须要另外一种方式来处理这些问题:
  • 自动故障转移(引入ZooKeeper达到自动化)
  • 移除对外界软件硬件的依赖(NAS/SAN)
  • 同时解决意外事故及平常维护致使的不可用

Quorum Journal Manager

QJM(Quorum Journal Manager)是Hadoop专门为Namenode共享存储开发的组件,一般是奇数点结点组成。其集群运行一组Journal Node,每一个Journal 节点暴露一个简单的RPC接口,容许Namenode读取和写入数据,数据存放在Journal节点的本地磁盘。当Namenode写入edit log时,NameNode会同时向所有JournalNode并行写文件,当超过半数节点回复确认成功写入以后,edit log就认为是成功写入。

1、 QJM写过程

NameNode 会把 EditLog 同时写到本地和 JournalNode 中。写本地由配置中的参数dfs.namenode.name.dir来控制,写JN由参数dfs.namenode.shared.edits.dir控制,在写EditLog时会由两个不同的输出流来控制日志的写过程,分别是:

  • EditLogFileOutputStream(本地输出流)
  • QuorumOutputStream(JN输出流)

NameNode在写EditLog时,并不是直接写到磁盘中,为保证高吞吐,NameNode会分别为EditLogFileOutputStream和QuorumOutputStream定义两个同等大小的Buffer,大小大概是512KB,一个写Buffer(buffCurrent),一个同步Buffer(buffReady),这样可以一边写一边同步,所以EditLog是一个异步写过程,同时也是一个批量同步的过程,避免每写一笔就同步一次日志。

这个是怎么实现边写边同步的呢,这中间其实是有一个缓冲区交换的过程,即bufferCurrent和buffReady在达到条件时会触发交换,如bufferCurrent在达到阈值同时bufferReady的数据又同步完时,bufferReady数据会清空,同时会将bufferCurrent指针指向bufferReady以满足继续写,另外会将bufferReady指针指向bufferCurrent以提供继续同步EditLog。

flowchart TD
    A((Client)) --修改--> B(NameNode)
    B-->C(本地 \n EditLogFileOutputStream)
    B-->D(JournalNode \n QuorumOutputStream)
    C-->E(bufferCurrent)
    C-->F(bufferReady)
    D-->H(bufferCurrent)
    D-->I(bufferReady)
    E-->M(本地目录)
    F-->M
    H-->N(JournalNode)
    I-->N

这里有一个问题,既然EditLog是异步写的,怎么保证缓存中的数据不丢呢,其实这里虽然是异步,但实际所有日志都需要通过logSync同步成功后才会给client返回成功码,假设某一时刻NameNode不可用了,其内存中的数据其实是未同步成功的,所以client会认为这部分数据未写成功。

(1)隔离双写

在Active NN每次同步EditLog到JN时,先要保证不会有两个NN同时向JN同步日志。这涉及一个很重要的概念Epoch Numbers,很多分布式系统都会用到。

成为Active结点时,其会被赋予一个EpochNumber,每个EpochNumber是惟一的,不会有相同的EpochNumber出现。EpochNumber有严格顺序保证,每次NN切换后其EpochNumber都会自增1,后面生成的EpochNumber都会大于前面的EpochNumber。QJM是怎么保证上面特性的呢,主要有以下几点:

  1. 在对EditLog作任何修改前,QJM(NameNode上)必须被赋予一个EpochNumber;
  2. QJM把自己的EpochNumber通过newEpoch(N)的方式发送给所有JN结点;
  3. 当JN收到newEpoch请求后,会把QJM的EpochNumber保存到一个lastPromisedEpoch变量中并持久化到本地磁盘;
  4. ANN同步日志到JN的任何RPC请求(如logEdits(),startLogSegment()等),都必须包含ANN的EpochNumber;
  5. JN在收到RPC请求后,会将之与lastPromisedEpoch对比,如果请求的EpochNumber小于lastPromisedEpoch,将会拒绝同步请求,反之,会接受同步请求并将请求的EpochNumber保存在lastPromisedEpoch;

这样就能保证主备NN发生切换时,就算同时向JN同步日志,也能保证日志不会写乱,因为发生切换后,原ANN的EpochNumber肯定是小于新ANN的EpochNumber,所以原ANN向JN的发起的所有同步请求都会拒绝,实现隔离功能,防止了脑裂。

(2)恢复in-process日志

如果在写过程中写失败了,可能各个JN上的EditLog的长度都不一样,需要在开始写之前将不一致的部分恢复。恢复机制如下:

  1. Active NN先向所有JN发送getJournalState请求;
  2. JN会向ANN返回一个Epoch(lastPromisedEpoch);
  3. Active NN收到大多数JN的Epoch后,选择最大的一个并加1作为当前新的Epoch,然后向JN发送新的newEpoch请求,把新的Epoch下发给JN;
  4. JN收到新的Epoch后,和lastPromisedEpoch对比,若更大则更新到本地并返回给Active NN自己本地一个最新EditLogSegment起始事务Id,若小则返回NN错误;
  5. Active NN收到多数JN成功响应后认为Epoch生成成功,开始准备日志恢复;
  6. Active NN会选择一个最大的EditLogSegment事务ID作为恢复依据,然后向JN发送prepareRecovery; RPC请求,对应Paxos协议2p阶段的Phase1a,若多数JN响应prepareRecovery成功,则可认为Phase1a阶段成功;
  7. Active NN选择进行同步的数据源,向JN发送acceptRecovery RPC请求,并将数据源作为参数传给JN。
  8. JN收到acceptRecovery请求后,会从JournalNodeHttpServer下载EditLogSegment并替换到本地保存的EditLogSegment,对应Paxos协议2p阶段的Phase1b,完成后返回Active NN请求成功状态。
  9. Active NN收到多数JN的响应成功请求后,向JN发送finalizeLogSegment请求,表示数据恢复完成,这样之后所有JN上的日志就能保持一致。 数据恢复后,Active NN上会将本地处于in-process状态的日志更名为finalized状态的日志,形式如editsstart-txidstop-txid。

(3)日志同步

  1. 执行logSync过程,将ANN上的日志数据放到缓存队列中
  2. 将缓存中数据同步到JN,JN有相应线程来处理logEdits请求
  3. JN收到数据后,先确认EpochNumber是否合法,再验证日志事务ID是否正常,将日志刷到磁盘,返回ANN成功码
  4. ANN收到JN成功请求后返回client写成功标识,若失败则抛出异常

通过上面一些步骤,日志能保证成功同步到JN,同时保证JN日志的一致性,进而备NN上同步日志时也能保证数据是完整和一致的。

2、QJM读过程

读过程是面向备NN(Standby NN)的,Standby NN定期检查JournalNode上EditLog的变化,然后将EditLog拉回本地。Standby NN上有一个线程StandbyCheckpointer,会定期将Standby NN上FSImage和EditLog合并,并将合并完的FSImage文件传回主NN(Active NN)上,就是所说的Checkpointing过程。下面我们来看下Checkpointing是怎么进行的。

在2.x版本中,已经将原来的由SecondaryNameNode主导的Checkpointing替换成由Standby NN主导的Checkpointing。

  1. 在Standby NN上先检查前置条件,前置条件包括两个方面:距离上次Checkpointing的时间间隔和EditLog中事务条数限制。
  2. 前置条件任何一个满足都会触发Checkpointing,然后SNN会将最新的NameSpace数据即SNN内存中当前状态的元数据保存到一个临时的fsimage文件( fsimage.ckpt)
  3. 然后比对从JN上拉到的最新EditLog的事务ID,将fsimage.ckpt_中没有,EditLog中有的所有元数据修改记录合并一起并重命名成新的fsimage文件,同时生成一个md5文件。
  4. 将最新的fsimage再通过HTTP请求传回ANN。

通过定期合并fsimage有什么好处?

  1. 可以避免EditLog越来越大,合并成新fsimage后可以将老的EditLog删除
  2. 可以避免主NN(ANN)压力过大,合并是在SNN上进行的
  3. 可以保证fsimage保存的是一份最新的元数据,故障恢复时避免数据丢失

三、主备自动切换
Hadoop的主备选举依赖于ZooKeeper。
整个切换过程是由ZKFC来控制的,ZKFC是实现主备切换的组件。每个运行的NameNode上都会有一个ZKFC进程(实际是一个Hadoop进程)。主要的功能如下:

  1. 健康检测:ZKFC会使用健康检测命令定期的ping同节点中的NameNode,只要该NameNode及时的回复健康,则任务当前NameNode是健康的;
  2. Zookeeper会话管理: 当本地NameNode是健康的,ZKFC会保持一个在Zookeeper中打开的会话。如果本地NameNode处于Active状态,ZKFC会保持一个特殊的znode锁,如果回话中断,锁节点讲自动删除;
  3. 基于Zookeeper的选举: 如果本地的NameNode是健康的,且ZKFC发现没有其他的节点持有当前的znode锁,它会为自己获取该锁。如果成功则进行故障切换,并且确保之前的NameNode的进程中断,将本地NameNode切换为Active;

在故障切换期间,ZooKeeper主要是发挥什么作用有以下几点:

  1. 失败保护:集群中每一个NameNode都会在ZooKeeper维护一个持久的session,机器一旦挂掉,session就会过期,故障迁移就会触发;
  2. Active NameNode选择:ZooKeeper有一个选择ActiveNN的机制,一旦现有的ANN宕机,其他NameNode可以向ZooKeeper申请排他成为下一个Active节点;
  3. 防脑裂: ZK本身是强一致和高可用的,可以用它来保证同一时刻只有一个活动节点;

参考链接:
https://blog.csdn.net/weixin_43854618/article/details/108808274

https://blog.csdn.net/shan19920501/article/details/124911283

hive compile 源码编译学习随笔

编译的源码来源于什么地方?如何知道互相兼容的版本号?用什么工具编译?和辅助的工具有哪些?编译有bug时如何快速定位到问题? 如果解决问题;我们第一点的基本思路是: ”先明确hive的版本,同时希望这个版本的hive去兼容某个版本的Hadoop“

1、一些问题

  • 编译的源码来源于什么地方?
  • 如何知道互相兼容的版本号?
  • 用什么工具编译?和辅助的工具有哪些
  • 编译有bug时如何快速定位到问题? 如果解决问题

2、编译的版本

Hadoop: 3.1.3
hive : 3.1.3
spark: 3.1.3

* 为什么都选择3.1.3,在hive 的官方地址,是有如下图说明的,那我们就默认hive3.1.3和hadoop3.1.3是完全兼容的(实际上是有bug的);以这两个版本的Hadoop和hive搭建的集群在启动hive的时候会报兼容性错误。

hive文档说明

实际上,我们要编译的组件一般不会是hadoop ,多数情况是hive,spark等组件,主要是因为这些组件如果基于Hadoop生态的运行的话,那都是这些组件去兼容Hadoop,故我们这里是主要编译hive。

也就是我们第一点的基本思路是: ”先明确hive的版本,同时希望这个版本的hive去兼容某个版本的Hadoop

3、寻找源码

源码基本上都是去GitHub上的maven仓库里找

地址:https://github.com/apache/hive 或者 https://gitee.com/apache/hive

此地址看到的hive的master分支,我们可以点击master的下拉框,然后输入我们的版本号,注意我们要编译的是具体的版本不是分支,所以要搜索tags,搜索后如下图。这个时候其实主要是搜一下看看有没有我们要编译的版本以及该版本下对应的其它组件版本号

切换到该版本后,找pom.xml文件,点击打开;我们可以直接在打开的页面搜索 hadoop ,多找一下就会发现hive3.1.3对应的Hadoop版本其实是 <hadoop.version>3.1.0</hadoop.version>,也就是我们之前所说的,如果把hive3.1.3放在hadoop3.1.3上搭建环境,大概率会出现问题的。

github选择hive3.1.3

4、配置环境

  • MacBook Pro (13-inch, M1, 2020), macOS Monterey
  • java: jdk1.8.0_212
  • maven: Apache Maven 3.8.6
  • IntelliJ IDEA 2022.2.2 (Community Edition)
  • git version 2.37.3

4.1 idea git 插件下载源码

  • idea 打开后 通过 Get from CVS的方式,填入github上hive的地址
  • 下载后在idea的右下角master的地方点一下,然后点击 。 checkout tag or version,输入3.1.3我们需要的版本点击确定。则branch就切换到该版本了。
  • 无论是 master分支还是我们想要切换的分支遇到提示 。 reload maven projects 的弹出框都点击yes
  • 此时刷新maven插件,基本上大概率是会报错的。不管是 pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde 和ldap-client-api:pom的哪一个错误,我们去搜索资料都能找到一些解决方案,但是基本上都是不可用的。
  • 为什么说报错在此篇文章书写时没法解决,主要是因为maven的问题,maven此时是最新版,而且从3.8.1开始maven block掉了所有HTTP协议的repositories,所以针对前边提到的各种错误,不管搜到什么答案,大概率都是没法有效解决的。笔者的处理方案是手动下载一个maven的tar包,配置一个maven3.6.3的版本,也就是次新的大版本
  • 刷新maven插件,基本上不会报错了;在Terminal输入 。 mvn clean package -Pdist -DskipTests -Dmaven.javadoc.skip=true,回车编译来验证当前的3.1.3版本是否有问题。

4.2 遇到的错误

Could not find artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde in nexus-aliyun
网上多数说是增加阿里云的spring- plugin的依赖,但是实际上没有效果,主要是此时的spring- plugin依赖里也没有这个jar包,需要增加spring自身的仓库支持
https://www.codeleading.com/article/91364854522/
5.1.5-jhyde
Failure to find org.glassfish:javax.el:pom:
此问题可以在此处不关注它,
glassfish:javax.el:pom
idea maven 一直报错“Could not transfer artifact
也可以不关注此问题,但是有专门的此问题场景解决方案
https://blog.51cto.com/u_15162069/2804511
Could not transfer artifact
Blocked mirror for repositories
最新版本的maven block掉了所有HTTP协议的repositories,仅支持https;maven次新大版本是3.6.3,是不会出现这个问题的。所以手动降版本,这也一下解决好几个问题
https://www.jianshu.com/p/274a45a2db05https://blog.csdn.net/qq_41980563/article/details/122061818
Blocked mirror for repositories

5、修改Hadoop版本号及处理报错

  • 视频教程里 : https://www.bilibili.com/video/BV1x14y177Ab?p=7&vd_source=b2e82cf2c96f0ecd7432e09d95d7d8ec 直接在集群中调用hive触发里问题,从而快速的找到了主要兼容的jar包,但是一般我们习惯性直接将hive源码的pom文件hadoop设置成我们希望的版本 3.1.3,直接编译,笔者尝试过,直接改 Hadoop3.1.2 这样编译时不出现问题。
  • 笔者并没有完全直接去修改主要的兼容报错的jar包, 在编译时遇到了各种报错,笔者没有采用视频摘樱桃的方式,而是逐一来修改涉及到的代码
  • java的某个类或者方法开始计算停用的时候会在官方API文档中开始做上 。 Deprecated 标记,有这个单词的方法说明官方在通知各位这个方法要做好放弃的准备了,它一般也同时给出解决方案和停用大概时间

6、修改的java文件及主要问题

  • 组件版本升级时涉及大到代码适配的工作,主要就是针对代码中的错误,去定位对应包中类的方法是在什么时候做出来了改变,而官方的API文档中都会有相应的痕迹,耐心的去文档里大版本的对应着找一找就可以找到了。(所谓大版本,就是类似于1.0,2.0,而中间的1.01,1.02 这些可以暂时跳过,除非某两个大版本相应的方法直接发生了变化,这时需要细化中间的小版本来看一下具体的情况,但一般都是大版本发布某方法要被弃用了,替换了的声明)
  • 有时候,你不需要去懂怎么回事,只要找到关键位置,照葫芦画瓢的做就好了
  • 其实要修改的还有很多地方,因为hive有很多的 module,每个module都可能指定不同的版本,而只有在某些特定场运行时的场景下才可能触发bug,但是现在编译确实是能通过。其它具体的问题就得去进一步慢慢验证修复
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.6.1:compile (default-compile) on project hive-druid-handler: Compilation failure[ERROR] /Users/ethan/IdeaProjects/hive/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java:[46,61] <T>emptyIterator()在com.google.commn.collect.Iterators中不是公共的; 无法从外部程序包中对其进行访问
import java.util.Collections;// 替换了如下的代码// private Iterator<List<Object>> compactedValues = Iterators.emptyIterator(); private Iterator<List<Object>> compactedValues = Collections.emptyIterator();
此问题找解决方法的过程是视频中说的方式,去API文档找报错的方法是在什么时候开始过渡停用的。整个修改的思路是差不多的,单独列出来这个是除了这个类,其它的修改基本上都是一样的方法,一样的处理方式
Iterators中不是公共的
git status刷新索引: 100% (17020/17020), 完成.位于分支 my-hive-3.1.3尚未暂存以备提交的变更: (使用 “git add <文件>…” 更新要提交的内容) (使用 “git restore <文件>…” 丢弃工作区的改动) 修改: druid-handler/pom.xml 修改: druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java 修改: llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java 修改: llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java 修改: llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java 修改: llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java 修改: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java 修改: pom.xml 修改: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java 修改: ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
以上是所有修改的地方,pom文件的修改主要是修改guava的版本号
修改guava的版本号

7、替换spark

先修改pom文件,替换spark的版本,spark3.x 对应的Scala版本最低是2.12,所以同时把Scala的版本也修改掉,可能有读者会问,怎么知道Scala也得变的,不要etc,自己去官方文档看一下基本的要求就知道了。

<spark.version>3.1.3</spark.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.10</scala.version><SPARK_SCALA_VERSION>2.12</SPARK_SCALA_VERSION>

然后继续在Terminal通过命令: mvn clean package -Pdist -DskipTests -Dmaven.javadoc.skip=true, 来从新编译,笔者遇到了大致三个问题

7.1 SparkCounter 中累加器的修改

这里主要是设计到spark自定义累加器实现的继承类替换问题。具体远离参考 spark 累加器学习随笔

7.2 ShuffleWriteMetrics.java 报错

metrics.shuffleWriteMetrics().ShuffleBytesWritten()不存在,从名称看类似的方法为bytesWritten()metrics.shuffleWriteMetrics().ShuffleWriteTime()同样不存在,修改为writeTime()
public ShuffleWriteMetrics(TaskMetrics metrics) { // metrics.shuffleWriteMetrics().ShuffleBytesWritten()不存在,从名称看类似的方法为bytesWritten() // metrics.shuffleWriteMetrics().ShuffleWriteTime()同样不存在,修改为writeTime() this(metrics.shuffleWriteMetrics().bytesWritten(), metrics.shuffleWriteMetrics().writeTime()); }
ShuffleBytesWritten()不存在

7.3 找不到spark_project

编译报错提示org.spark_project.guava····· 不存在,找到相关报错的java文件,修改import org.spark_project.guava.collect.Sets;为 import org.sparkproject.guava.collect.Sets;。保存后,重新安装依赖