spark exactly once 语义随笔

Spark具备很好的机制来保证exactly once的语义,具体体现在数据源的可重放性、计算过程中的容错性、以及写入存储介质时的幂等性或者事务性。exactly once指的是在处理数据的过程中,系统有很好的容错性(fault-tolerance),能够保证数据处理不重不丢,每一条数据仅被处理一次。

语言所蕴含的意义就是语义(semantic)。简单的说,符号是语言的载体。符号本身没有任何意义,只有被赋予含义的符号才能够被使用,这时候语言就转化为了信息,而语言的含义就是语义。
语义可以简单地看作是数据所对应的现实世界中的事物所代表的概念的含义,以及这些含义之间的关系,是数据在某个领域上的解释和逻辑表示。

一、三种语义

spark exactly once 语义是spark很重要的一个概念。分布式系统中都基本上会有三种关键的语义

  • at most once:至多一次,数据可能会丢,但不会重复
  • at least once:至少一次,数据肯定不会丢失,但可能重复
  • exactly once:有且只有一次,数据不丢失不重复,且只出现一次; 该语义是最理想的,但也难以实现

Spark具备很好的机制来保证exactly once的语义,具体体现在数据源的可重放性、计算过程中的容错性、以及写入存储介质时的幂等性或者事务性。exactly once指的是在处理数据的过程中,系统有很好的容错性(fault-tolerance),能够保证数据处理不重不丢,每一条数据仅被处理一次。

一个典型的 Spark Streaming 应用程序会包含三个处理阶段:接收数据、处理汇总、输出结果。每个阶段都需要做不同的处理才能实现相应的语义

二、接收数据 – 数据源的可重放性

取决于上游数据源的特性

  • HDFS 这类支持容错的文件系统中读取文件,能够直接支持 Exactly-once 语义。
  • 上游消息系统支持 ACK(如RabbitMQ),我们就可以结合 Spark 的 Write Ahead Log 特性来实现 At-least-once 语义。
  • 非可靠的数据接收器(如 socketTextStream),当 Worker 或 Driver 节点发生故障时就会产生数据丢失,提供的语义也是未知的。
  • Kafka 消息系统是基于偏移量(Offset)的,它的 Direct API 可以提供 Exactly-once 语义

SparkStreaming接入Kafka的数据有两种模式

一种为Receiver模式,一种为Direct模式。

Receiver模式

Receiver模式都不能够实现exactly once的语义,其根本原因是Kafka自己管理的offset与SparkStreaming实际处理数据的offset没有同步导致的。

Receiver模式采用Kafka的高阶consumer API,Kafka自己封装了对数据的获取逻辑,且通过Zookeeper管理offset信息,这种模式在与SparkStreaming对接时,有以下特点:

  1. Kafka中的partition数量与SparkStreaming中的并行度不是一一对应的,SparkStreaming通过创建Receiver去读取Kafka中数据,createStream()方法传入的并发参数代表的是读取Kafka中topic+partition的线程数,并不能提高SparkStreaming读取数据的并行度。
  2. Kafka自己管理offset,Receiver作为一个高层的Consumer来消费数据,其消费的偏移量(offset)由Kafka记录在Zookeeper中,一旦出现错误,那些已经标记为消费过的数据将会丢失。
  3. Receiver模式下,为了解决读取数据时的并行度问题,可以创建多个DStream,然后union起来
  4. 为了解决数据丢失的问题,可以选择开启Spark的WAL(write ahead log)机制,每次处理数据前将预写日志写入到HDFS中,如果节点出现错误,可以从WAL中恢复。这种方法其实效率低下,不仅数据冗余(Kafka中有副本机制,Spark中还要存一份),且无法保证exactly once,数据可能重复消费。

Direct模式

Spark1.3中引入了Direct模式来替代Receiver模式,它使用Kafka的Simple consumer API,由Spark应用自己管理offset信息,以达成exactly once的语义,其特点如下:

  1. Kafka中的partition与SparkStreaming中的partition一一对应,也就是SparkStreaming读取数据的并行度取决于Kafka中partition的数量。
  2. 不依赖Receiver,而是通过低阶api直接找到topic+partition的leader获取数据,并由SparkStreaming应用自己负责追踪维护消费的offset。
  3. Direct模式下,SparkStreaming应用管理offset的方法案例,其中offset依然是存放在zookeeper中,但是由应用自身来管理的,offset也可以放在Redis、MySQL、HBase中进行管理,根据具体情况进行选择。

转化或汇总 – 计算过程中的容错性

Spark RDD 本身就是一种具备容错性、不变性、以及计算确定性的数据结构。只要数据来源是可用的,且处理过程中没有副作用(Side effect),我们就能一直得到相同的计算结果;撇去Driver与Executor的高可用性不说,Spark应用内部则采用checkpoint和lineage的机制来确保容错性。

lineage

一般翻译为血统,简单来说就是RDD在转化的过程中,由于父RDD与子RDD存在依赖关系(Dependency),从而形成的lineage,也可以理解为lineage串起了RDD DAG。

RDD可以进行缓存,通过调用persist或者cache方法,将RDD持久化到内存或者磁盘中,这样缓存的RDD就可以被保留在计算节点的内存中被重用,缓存是构建Spark快速迭代的关键。

当一个RDD丢失的情况下,Spark会去寻找它的父RDD是否已经缓存,如果已经缓存,就可以通过父RDD直接算出当前的RDD,从而避免了缓存之前的RDD的计算过程,且只有丢失数据的partition需要进行重算,这样Spark就避免了RDD上的重复计算,能够极大的提升计算速度。

缓存虽然可以提升Spark快速迭代计算的速度,但是缓存是会丢失的。

checkpoint

检查点机制就是为了可以切断lineage的依赖关系,在某个重要的节点,将RDD持久化到文件系统中(一般选择HDFS),这样就算之前的缓存已经丢失了,也可以保证检查点数据不会丢失,这样在恢复的时候,会直接从检查点的数据开始进行计算,检查点机制在SparkStreaming这种流式计算中发挥的作用会更大。

可以通过以下源码为入口进一步了解Spark的缓存和检查点机制,RDD在进行计算的时候会调用其iterator方法,在该方法中会首先去读取缓存的数据,如果没有缓存的数据则会去读取checkpoint的数据

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

Spark在计算过程中采用的lineage和checkpoint机制相互结合,取长补短,再加上Spark各个组件底层本身就是具有高可用性,所以在Spark应用在转化计算的过程中,可是保证数据处理的exactly once。

结果输出 – 写入存储介质的幂等性或事务性

幂等更新

  • 多次写入会产生相同的结果数据,我们可以认为这类写入操作是幂等的;saveAsTextFile 就是一种典型的幂等写入
  • 幂等写入只适用于 Map-only 型的计算流程,即没有 Shuffle、Reduce、Repartition 等操作
  • 对 Kafka DStream 做一些额外设置:
    1. 将 enable.auto.commit 设置为 false。默认情况下,Kafka DStream 会在接收到数据后立刻更新自己的偏移量,我们需要将这个动作推迟到计算完成之后。
    2. 打开 Spark Streaming 的 Checkpoint 特性,用于存放 Kafka 偏移量。但若应用程序代码发生变化,Checkpoint 数据也将无法使用,这就需要改用在数据输出之后手动提交 Kafka 偏移量。HasOffsetRanges 类,以及 commitAsync API 可以做到这一点
      messages.foreachRDD { rdd =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition { iter =>
          // output to database
        }
        messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }

事务更新

  • 务型写入时,我们需要生成一个唯一 ID,这个 ID 可以使用当前批次的时间、分区号、或是 Kafka 偏移量来生成
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}
  • 通常会在 foreachPartition 方法中来执行数据库写入操作。对于 Map-only 流程来说是适用的
  • 这种流程下 Kafka 分区和 RDD 分区是一一对应的,我们可以用以下方式获取各分区的偏移量:
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val offsetRange = offsetRanges(TaskContext.get.partitionId)
  }
}
  • 对于包含 Shuffle 的计算流程(如上文的错误日志统计),我们需要先将处理结果拉取到 Driver 进程中,然后才能执行事务操作;如果偏移量写入失败,或者重复处理了某一部分数据(offset != $fromOffset 判断条件不通过),该事务就会回滚,从而做到 Exactly-once。
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val result = processLogs(rdd).collect() // parse log and count error
  DB.localTx { implicit session =>
    result.foreach { case (time, count) =>
      // save to error_log table
    }
    offsetRanges.foreach { offsetRange =>
      val affectedRows = sql"""
      update kafka_offset set offset = ${offsetRange.untilOffset}
      where topic = ${topic} and `partition` = ${offsetRange.partition}
      and offset = ${offsetRange.fromOffset}
      """.update.apply()

      if (affectedRows != 1) {
        throw new Exception("fail to update offset")
      }
    }
  }
}

exactly once固然是个理想的状态,但其实现成本也是非常高的,在对数据可靠性要求不是很高的场景中,at-least-once甚至丢失少量数据也是可以作为一个选项考虑的

总结

官方已经不再推荐使用Receiver模式

Receiver接收方式

  • 多个Receiver接受数据效率高,但有丢失数据的风险
  • 开启日志(WAL)可防止数据丢失,但写两遍数据效率低。
  • Zookeeper维护offset有重复消费数据可能。
  • 使用高层次的API

Direct直连方式

  • 不使用Receiver,直接到kafka分区中读取数据
  • 不使用日志(WAL)机制
  • Spark自己维护offset
  • 使用低层次的API

hive compile 源码编译学习随笔

编译的源码来源于什么地方?如何知道互相兼容的版本号?用什么工具编译?和辅助的工具有哪些?编译有bug时如何快速定位到问题? 如果解决问题;我们第一点的基本思路是: ”先明确hive的版本,同时希望这个版本的hive去兼容某个版本的Hadoop“

1、一些问题

  • 编译的源码来源于什么地方?
  • 如何知道互相兼容的版本号?
  • 用什么工具编译?和辅助的工具有哪些
  • 编译有bug时如何快速定位到问题? 如果解决问题

2、编译的版本

Hadoop: 3.1.3
hive : 3.1.3
spark: 3.1.3

* 为什么都选择3.1.3,在hive 的官方地址,是有如下图说明的,那我们就默认hive3.1.3和hadoop3.1.3是完全兼容的(实际上是有bug的);以这两个版本的Hadoop和hive搭建的集群在启动hive的时候会报兼容性错误。

hive文档说明

实际上,我们要编译的组件一般不会是hadoop ,多数情况是hive,spark等组件,主要是因为这些组件如果基于Hadoop生态的运行的话,那都是这些组件去兼容Hadoop,故我们这里是主要编译hive。

也就是我们第一点的基本思路是: ”先明确hive的版本,同时希望这个版本的hive去兼容某个版本的Hadoop

3、寻找源码

源码基本上都是去GitHub上的maven仓库里找

地址:https://github.com/apache/hive 或者 https://gitee.com/apache/hive

此地址看到的hive的master分支,我们可以点击master的下拉框,然后输入我们的版本号,注意我们要编译的是具体的版本不是分支,所以要搜索tags,搜索后如下图。这个时候其实主要是搜一下看看有没有我们要编译的版本以及该版本下对应的其它组件版本号

切换到该版本后,找pom.xml文件,点击打开;我们可以直接在打开的页面搜索 hadoop ,多找一下就会发现hive3.1.3对应的Hadoop版本其实是 <hadoop.version>3.1.0</hadoop.version>,也就是我们之前所说的,如果把hive3.1.3放在hadoop3.1.3上搭建环境,大概率会出现问题的。

github选择hive3.1.3

4、配置环境

  • MacBook Pro (13-inch, M1, 2020), macOS Monterey
  • java: jdk1.8.0_212
  • maven: Apache Maven 3.8.6
  • IntelliJ IDEA 2022.2.2 (Community Edition)
  • git version 2.37.3

4.1 idea git 插件下载源码

  • idea 打开后 通过 Get from CVS的方式,填入github上hive的地址
  • 下载后在idea的右下角master的地方点一下,然后点击 。 checkout tag or version,输入3.1.3我们需要的版本点击确定。则branch就切换到该版本了。
  • 无论是 master分支还是我们想要切换的分支遇到提示 。 reload maven projects 的弹出框都点击yes
  • 此时刷新maven插件,基本上大概率是会报错的。不管是 pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde 和ldap-client-api:pom的哪一个错误,我们去搜索资料都能找到一些解决方案,但是基本上都是不可用的。
  • 为什么说报错在此篇文章书写时没法解决,主要是因为maven的问题,maven此时是最新版,而且从3.8.1开始maven block掉了所有HTTP协议的repositories,所以针对前边提到的各种错误,不管搜到什么答案,大概率都是没法有效解决的。笔者的处理方案是手动下载一个maven的tar包,配置一个maven3.6.3的版本,也就是次新的大版本
  • 刷新maven插件,基本上不会报错了;在Terminal输入 。 mvn clean package -Pdist -DskipTests -Dmaven.javadoc.skip=true,回车编译来验证当前的3.1.3版本是否有问题。

4.2 遇到的错误

Could not find artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde in nexus-aliyun
网上多数说是增加阿里云的spring- plugin的依赖,但是实际上没有效果,主要是此时的spring- plugin依赖里也没有这个jar包,需要增加spring自身的仓库支持
https://www.codeleading.com/article/91364854522/
5.1.5-jhyde
Failure to find org.glassfish:javax.el:pom:
此问题可以在此处不关注它,
glassfish:javax.el:pom
idea maven 一直报错“Could not transfer artifact
也可以不关注此问题,但是有专门的此问题场景解决方案
https://blog.51cto.com/u_15162069/2804511
Could not transfer artifact
Blocked mirror for repositories
最新版本的maven block掉了所有HTTP协议的repositories,仅支持https;maven次新大版本是3.6.3,是不会出现这个问题的。所以手动降版本,这也一下解决好几个问题
https://www.jianshu.com/p/274a45a2db05https://blog.csdn.net/qq_41980563/article/details/122061818
Blocked mirror for repositories

5、修改Hadoop版本号及处理报错

  • 视频教程里 : https://www.bilibili.com/video/BV1x14y177Ab?p=7&vd_source=b2e82cf2c96f0ecd7432e09d95d7d8ec 直接在集群中调用hive触发里问题,从而快速的找到了主要兼容的jar包,但是一般我们习惯性直接将hive源码的pom文件hadoop设置成我们希望的版本 3.1.3,直接编译,笔者尝试过,直接改 Hadoop3.1.2 这样编译时不出现问题。
  • 笔者并没有完全直接去修改主要的兼容报错的jar包, 在编译时遇到了各种报错,笔者没有采用视频摘樱桃的方式,而是逐一来修改涉及到的代码
  • java的某个类或者方法开始计算停用的时候会在官方API文档中开始做上 。 Deprecated 标记,有这个单词的方法说明官方在通知各位这个方法要做好放弃的准备了,它一般也同时给出解决方案和停用大概时间

6、修改的java文件及主要问题

  • 组件版本升级时涉及大到代码适配的工作,主要就是针对代码中的错误,去定位对应包中类的方法是在什么时候做出来了改变,而官方的API文档中都会有相应的痕迹,耐心的去文档里大版本的对应着找一找就可以找到了。(所谓大版本,就是类似于1.0,2.0,而中间的1.01,1.02 这些可以暂时跳过,除非某两个大版本相应的方法直接发生了变化,这时需要细化中间的小版本来看一下具体的情况,但一般都是大版本发布某方法要被弃用了,替换了的声明)
  • 有时候,你不需要去懂怎么回事,只要找到关键位置,照葫芦画瓢的做就好了
  • 其实要修改的还有很多地方,因为hive有很多的 module,每个module都可能指定不同的版本,而只有在某些特定场运行时的场景下才可能触发bug,但是现在编译确实是能通过。其它具体的问题就得去进一步慢慢验证修复
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.6.1:compile (default-compile) on project hive-druid-handler: Compilation failure[ERROR] /Users/ethan/IdeaProjects/hive/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java:[46,61] <T>emptyIterator()在com.google.commn.collect.Iterators中不是公共的; 无法从外部程序包中对其进行访问
import java.util.Collections;// 替换了如下的代码// private Iterator<List<Object>> compactedValues = Iterators.emptyIterator(); private Iterator<List<Object>> compactedValues = Collections.emptyIterator();
此问题找解决方法的过程是视频中说的方式,去API文档找报错的方法是在什么时候开始过渡停用的。整个修改的思路是差不多的,单独列出来这个是除了这个类,其它的修改基本上都是一样的方法,一样的处理方式
Iterators中不是公共的
git status刷新索引: 100% (17020/17020), 完成.位于分支 my-hive-3.1.3尚未暂存以备提交的变更: (使用 “git add <文件>…” 更新要提交的内容) (使用 “git restore <文件>…” 丢弃工作区的改动) 修改: druid-handler/pom.xml 修改: druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java 修改: llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java 修改: llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java 修改: llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java 修改: llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java 修改: llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java 修改: pom.xml 修改: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java 修改: ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
以上是所有修改的地方,pom文件的修改主要是修改guava的版本号
修改guava的版本号

7、替换spark

先修改pom文件,替换spark的版本,spark3.x 对应的Scala版本最低是2.12,所以同时把Scala的版本也修改掉,可能有读者会问,怎么知道Scala也得变的,不要etc,自己去官方文档看一下基本的要求就知道了。

<spark.version>3.1.3</spark.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.10</scala.version><SPARK_SCALA_VERSION>2.12</SPARK_SCALA_VERSION>

然后继续在Terminal通过命令: mvn clean package -Pdist -DskipTests -Dmaven.javadoc.skip=true, 来从新编译,笔者遇到了大致三个问题

7.1 SparkCounter 中累加器的修改

这里主要是设计到spark自定义累加器实现的继承类替换问题。具体远离参考 spark 累加器学习随笔

7.2 ShuffleWriteMetrics.java 报错

metrics.shuffleWriteMetrics().ShuffleBytesWritten()不存在,从名称看类似的方法为bytesWritten()metrics.shuffleWriteMetrics().ShuffleWriteTime()同样不存在,修改为writeTime()
public ShuffleWriteMetrics(TaskMetrics metrics) { // metrics.shuffleWriteMetrics().ShuffleBytesWritten()不存在,从名称看类似的方法为bytesWritten() // metrics.shuffleWriteMetrics().ShuffleWriteTime()同样不存在,修改为writeTime() this(metrics.shuffleWriteMetrics().bytesWritten(), metrics.shuffleWriteMetrics().writeTime()); }
ShuffleBytesWritten()不存在

7.3 找不到spark_project

编译报错提示org.spark_project.guava····· 不存在,找到相关报错的java文件,修改import org.spark_project.guava.collect.Sets;为 import org.sparkproject.guava.collect.Sets;。保存后,重新安装依赖

spark Accumulator AccumulatorV2 累加器学习随笔

spark Accumulator AccumulatorV2 累加器是Spark的核心数据结构之一 — Spark的三大核心数据结构:RDD、累加器(只写不读)、广播变量(只读不写),累加器在不同的spark版本中有不一样的具体实现逻辑

spark累加器图示

spark Accumulator AccumulatorV2 累加器是Spark的核心数据结构之一 — Spark的三大核心数据结构:RDD、累加器(只写不读)、广播变量(只读不写),累加器在不同的spark版本中有不一样的具体实现逻辑;而累加器的基本逻辑过程如下

  1. 自定义变量在Spark中运算时,会从Driver中复制一份副本到Executor中运算,但变量的运算结果并不会返回给Driver,所以无法实现自定义变量的值改变,一直都是初始值,所以针对这个问题,引入了累加器的概念;
  2. 系统累加器longAccumulator和自定义累加器(extends AccumulatorV2[类型,类型])实际都是两步,new累加器,然后sc.register注册累加器;
  3. 先在Driver程序中创建一个值为0或者空的累加器对象,Task运算时,Executor中会copy一份累加器对象,在Executor中进行运算,累加器的运算结果返回给Driver程序并合并Merge,得出累加器最终结果
  4. 累加器.add(元素);具体对元素的操作包括数据sum、增加、删减、筛选等要求,都可以写在自定义累加器的.add()方法中。

Spark API

  • spark API的地址都可以在该网址中找到: https://spark.apache.org/docs/
  • 点击想要看到的版本,页面打导航栏有 API Docs ,点击想要了解的语言名称即可,例如点击 Scala,则进入对应的API地址 : https://spark.apache.org/docs/2.0.0/api/scala/index.html#org.apache.spark.package
  • 我们查看具体的类和方法的时候就特别注意一个单词 【Deprecated】,它有时候会出现类声明的最开始 Annotations,或者直接在具体的方法说明中出现。当出现这个单词的时候就意味着这个类或者方法在以后的版本中要慢慢被弃用或者替代

Accumulator 和 AccumulatorParam

Spark1.x 中实现累加器需要用到类 Accumulator和 AccumulatorParam。以spark1.6.3为例,内置数值类型的累加器用Accumulator类,而自定义累加器需要继承接口AccumulatorParam ,并实现相应的方法,而在2.0版本之后这个方式开始不再推荐使用了。

    // 在类的声明中出现了如下的说明,也就是该类将被 AccumulatorV2 所替代。
    Annotations @deprecated
    Deprecated (Since version 2.0.0) use AccumulatorV2
    trait AccumulatorParam[T] extends AccumulableParam[T, T]

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

AccumulatorV2

从spark2.0开始自定义累加器的实现不再提倡使用AccumulatorParam, 而是使用AccumulatorV2, 自定义类继承AccumulatorV2,并重写其固定的几个方法

  • reset:用于重置累加器为0
  • add:用于向累加器加一个值
  • merge:用于合并另一个同类型的累加器到当前累加器
  • copy():创建此累加器的新副本
  • isZero():返回该累加器是否为零值
  • value():获取此累加器的当前值
def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("Application")
        //构建Spark上下文对象
        val sc = new SparkContext(conf)

        //创建累加器
        val sum = new MyAccumulator()

        //注册累加器
        sc.register(sum,"accumulator")

        val rdd = sc.makeRDD(Array(1,2,3,4,5))

        rdd.map(item=>{
            sum.add(item)
        }).collect()
        println("sum = "+sum.value)

        //释放资源
        sc.stop()
    }

//自定义累加器
class MyAccumulator extends AccumulatorV2[Int,Int]{
    var sum = 0

    //1. 是否初始状态(sum为0表示累加器为初始状态)
    override def isZero: Boolean = sum == 0

    //2. 执行器执行时需要拷贝累加器对象(把累加器对象序列化后,从Driver传到Executor)
    override def copy(): AccumulatorV2[Int,Int] = {
        val mine = new MyAccumulator
        mine
    }

    //3. 重置数据(重置后看当前累加器是否为初始状态)
    override def reset(): Unit = sum = 0

    //累加数据
    override def add(v: Int): Unit = {
        sum = sum + v
    }

    //合并计算结果数据(把所有Executor中累加器value合并)
    override def merge(other: AccumulatorV2[Int, Int]): Unit = {
        sum = sum + other.value
    }

    //累加器的结果
    override def value: Int = sum
}