spark Accumulator AccumulatorV2 累加器学习随笔

spark Accumulator AccumulatorV2 累加器是Spark的核心数据结构之一 — Spark的三大核心数据结构:RDD、累加器(只写不读)、广播变量(只读不写),累加器在不同的spark版本中有不一样的具体实现逻辑

spark累加器图示

spark Accumulator AccumulatorV2 累加器是Spark的核心数据结构之一 — Spark的三大核心数据结构:RDD、累加器(只写不读)、广播变量(只读不写),累加器在不同的spark版本中有不一样的具体实现逻辑;而累加器的基本逻辑过程如下

  1. 自定义变量在Spark中运算时,会从Driver中复制一份副本到Executor中运算,但变量的运算结果并不会返回给Driver,所以无法实现自定义变量的值改变,一直都是初始值,所以针对这个问题,引入了累加器的概念;
  2. 系统累加器longAccumulator和自定义累加器(extends AccumulatorV2[类型,类型])实际都是两步,new累加器,然后sc.register注册累加器;
  3. 先在Driver程序中创建一个值为0或者空的累加器对象,Task运算时,Executor中会copy一份累加器对象,在Executor中进行运算,累加器的运算结果返回给Driver程序并合并Merge,得出累加器最终结果
  4. 累加器.add(元素);具体对元素的操作包括数据sum、增加、删减、筛选等要求,都可以写在自定义累加器的.add()方法中。

Spark API

  • spark API的地址都可以在该网址中找到: https://spark.apache.org/docs/
  • 点击想要看到的版本,页面打导航栏有 API Docs ,点击想要了解的语言名称即可,例如点击 Scala,则进入对应的API地址 : https://spark.apache.org/docs/2.0.0/api/scala/index.html#org.apache.spark.package
  • 我们查看具体的类和方法的时候就特别注意一个单词 【Deprecated】,它有时候会出现类声明的最开始 Annotations,或者直接在具体的方法说明中出现。当出现这个单词的时候就意味着这个类或者方法在以后的版本中要慢慢被弃用或者替代

Accumulator 和 AccumulatorParam

Spark1.x 中实现累加器需要用到类 Accumulator和 AccumulatorParam。以spark1.6.3为例,内置数值类型的累加器用Accumulator类,而自定义累加器需要继承接口AccumulatorParam ,并实现相应的方法,而在2.0版本之后这个方式开始不再推荐使用了。

    // 在类的声明中出现了如下的说明,也就是该类将被 AccumulatorV2 所替代。
    Annotations @deprecated
    Deprecated (Since version 2.0.0) use AccumulatorV2
    trait AccumulatorParam[T] extends AccumulableParam[T, T]

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

AccumulatorV2

从spark2.0开始自定义累加器的实现不再提倡使用AccumulatorParam, 而是使用AccumulatorV2, 自定义类继承AccumulatorV2,并重写其固定的几个方法

  • reset:用于重置累加器为0
  • add:用于向累加器加一个值
  • merge:用于合并另一个同类型的累加器到当前累加器
  • copy():创建此累加器的新副本
  • isZero():返回该累加器是否为零值
  • value():获取此累加器的当前值
def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("Application")
        //构建Spark上下文对象
        val sc = new SparkContext(conf)

        //创建累加器
        val sum = new MyAccumulator()

        //注册累加器
        sc.register(sum,"accumulator")

        val rdd = sc.makeRDD(Array(1,2,3,4,5))

        rdd.map(item=>{
            sum.add(item)
        }).collect()
        println("sum = "+sum.value)

        //释放资源
        sc.stop()
    }

//自定义累加器
class MyAccumulator extends AccumulatorV2[Int,Int]{
    var sum = 0

    //1. 是否初始状态(sum为0表示累加器为初始状态)
    override def isZero: Boolean = sum == 0

    //2. 执行器执行时需要拷贝累加器对象(把累加器对象序列化后,从Driver传到Executor)
    override def copy(): AccumulatorV2[Int,Int] = {
        val mine = new MyAccumulator
        mine
    }

    //3. 重置数据(重置后看当前累加器是否为初始状态)
    override def reset(): Unit = sum = 0

    //累加数据
    override def add(v: Int): Unit = {
        sum = sum + v
    }

    //合并计算结果数据(把所有Executor中累加器value合并)
    override def merge(other: AccumulatorV2[Int, Int]): Unit = {
        sum = sum + other.value
    }

    //累加器的结果
    override def value: Int = sum
}

Spark共享变量

spark提共的两种共享变量(广播变量和累加器)。至于为什么要共享变量是与spark的原理机制有关:通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传,也就是说有结果Driver程序是拿不到的!共享变量主要是为了解决这个问题。

1、广播变量

broadcast
val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello xasxt")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("./words.txt")
lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}
sc.stop()

注意事项:
问:能不能将一个RDD使用广播变量广播出去?
答:不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。
广播变量只能在Driver端定义,不能在Executor端定义。
在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

2、累加器

累加器只能够增加。 只有driver能获取到Accumulator的值(使用value方法),Task(excutor)只能对其做增加操作(使用 +=)。

def accumulator[T](initialValue: T,name: String)(implicit param: org.apache.spark.AccumulatorParam[T]): org.apache.spark.Accumulator[T] 

第一个参数应是数值类型,是累加器的初始值,第二个参数是该累加器的命字,这样就会在spark web ui中显示,可以帮助你了解程序运行的情况。

val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.accumulator(0)
sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}
println(accumulator.value)
sc.stop()

注意事项:
累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在Excutor端更新。

关于累加器多说一些,先看一个错误的例子:

val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator统计偶数出现的次数,同时偶数返回0,奇数返回1
val newData = data.map{x => {
  if(x%2 == 0){
    accum += 1
      0
    }else 1
}}
//使用action操作触发执行
newData.count
//此时accum的值为5,是我们要的结果
accum.value

//继续操作,查看刚才变动的数据,foreach也是action操作
newData.foreach(println)
//上个步骤没有进行累计器操作,可是累加器此时的结果已经是10了
//这并不是我们想要的结果
accum.value

spark中的一系列transform操作会构成DAG,此时需要通过一个action操作来触发,accumulator也是一样。因此在一个action操作之前,你调用value方法查看其数值,肯定是没有任何变化的。
所以在第一次count(action操作)之后,我们发现累加器的数值变成了5,是我们要的答案。
之后又对新产生的的newData进行了一次foreach(action操作),其实这个时候又执行了一次map(transform)操作,所以累加器又增加了5。最终获得的结果变成了10。

这种问题如何解决呢?

看了上面的分析,大家都有这种印象了,那就是使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。事实上,还是有解决方案的,只要将任务之间的依赖关系切断就可以了。

什么方法有这种功能呢?

你们肯定都想到了,cache,persist。调用这个方法的时候会将之前的依赖切除,后续的累加器就不会再被之前的transfrom操作影响到了。

总之,使用Accumulator时,为了保证准确性,只使用一次action操作

val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//代码和上方相同
val newData = data.map{x => {...}}
//使用cache缓存数据,切断依赖。
newData.cache.count
//此时accum的值为5
accum.value
newData.foreach(println)
//此时的accum依旧是5
accum.value