spark Accumulator AccumulatorV2 累加器学习随笔

spark Accumulator AccumulatorV2 累加器是Spark的核心数据结构之一 — Spark的三大核心数据结构:RDD、累加器(只写不读)、广播变量(只读不写),累加器在不同的spark版本中有不一样的具体实现逻辑

spark累加器图示

spark Accumulator AccumulatorV2 累加器是Spark的核心数据结构之一 — Spark的三大核心数据结构:RDD、累加器(只写不读)、广播变量(只读不写),累加器在不同的spark版本中有不一样的具体实现逻辑;而累加器的基本逻辑过程如下

  1. 自定义变量在Spark中运算时,会从Driver中复制一份副本到Executor中运算,但变量的运算结果并不会返回给Driver,所以无法实现自定义变量的值改变,一直都是初始值,所以针对这个问题,引入了累加器的概念;
  2. 系统累加器longAccumulator和自定义累加器(extends AccumulatorV2[类型,类型])实际都是两步,new累加器,然后sc.register注册累加器;
  3. 先在Driver程序中创建一个值为0或者空的累加器对象,Task运算时,Executor中会copy一份累加器对象,在Executor中进行运算,累加器的运算结果返回给Driver程序并合并Merge,得出累加器最终结果
  4. 累加器.add(元素);具体对元素的操作包括数据sum、增加、删减、筛选等要求,都可以写在自定义累加器的.add()方法中。

Spark API

  • spark API的地址都可以在该网址中找到: https://spark.apache.org/docs/
  • 点击想要看到的版本,页面打导航栏有 API Docs ,点击想要了解的语言名称即可,例如点击 Scala,则进入对应的API地址 : https://spark.apache.org/docs/2.0.0/api/scala/index.html#org.apache.spark.package
  • 我们查看具体的类和方法的时候就特别注意一个单词 【Deprecated】,它有时候会出现类声明的最开始 Annotations,或者直接在具体的方法说明中出现。当出现这个单词的时候就意味着这个类或者方法在以后的版本中要慢慢被弃用或者替代

Accumulator 和 AccumulatorParam

Spark1.x 中实现累加器需要用到类 Accumulator和 AccumulatorParam。以spark1.6.3为例,内置数值类型的累加器用Accumulator类,而自定义累加器需要继承接口AccumulatorParam ,并实现相应的方法,而在2.0版本之后这个方式开始不再推荐使用了。

    // 在类的声明中出现了如下的说明,也就是该类将被 AccumulatorV2 所替代。
    Annotations @deprecated
    Deprecated (Since version 2.0.0) use AccumulatorV2
    trait AccumulatorParam[T] extends AccumulableParam[T, T]

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

AccumulatorV2

从spark2.0开始自定义累加器的实现不再提倡使用AccumulatorParam, 而是使用AccumulatorV2, 自定义类继承AccumulatorV2,并重写其固定的几个方法

  • reset:用于重置累加器为0
  • add:用于向累加器加一个值
  • merge:用于合并另一个同类型的累加器到当前累加器
  • copy():创建此累加器的新副本
  • isZero():返回该累加器是否为零值
  • value():获取此累加器的当前值
def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("Application")
        //构建Spark上下文对象
        val sc = new SparkContext(conf)

        //创建累加器
        val sum = new MyAccumulator()

        //注册累加器
        sc.register(sum,"accumulator")

        val rdd = sc.makeRDD(Array(1,2,3,4,5))

        rdd.map(item=>{
            sum.add(item)
        }).collect()
        println("sum = "+sum.value)

        //释放资源
        sc.stop()
    }

//自定义累加器
class MyAccumulator extends AccumulatorV2[Int,Int]{
    var sum = 0

    //1. 是否初始状态(sum为0表示累加器为初始状态)
    override def isZero: Boolean = sum == 0

    //2. 执行器执行时需要拷贝累加器对象(把累加器对象序列化后,从Driver传到Executor)
    override def copy(): AccumulatorV2[Int,Int] = {
        val mine = new MyAccumulator
        mine
    }

    //3. 重置数据(重置后看当前累加器是否为初始状态)
    override def reset(): Unit = sum = 0

    //累加数据
    override def add(v: Int): Unit = {
        sum = sum + v
    }

    //合并计算结果数据(把所有Executor中累加器value合并)
    override def merge(other: AccumulatorV2[Int, Int]): Unit = {
        sum = sum + other.value
    }

    //累加器的结果
    override def value: Int = sum
}

flink wordcount 一步一步学习 大数据

flink wordcount 初次看到这两个词的同学可能会有些不明所以,但是一般在大数据研发有过一定深耕或者了解的童鞋会瞬间get到在表达什么意思。大数据的代码研发不论是用什么语言什么工具,在最开始的时候都会有一个经典的代码示例,词数统计。

flink wordcount 初次看到这两个词的同学可能会有些不明所以,但是一般在大数据研发有过一定深耕或者了解的童鞋会瞬间get到在表达什么意思。大数据的代码研发不论是用什么语言什么工具,在最开始的时候都会有一个经典的代码示例,词数统计。

开发及运行环境

  1. MacBook Pro (13-inch, M1, 2020) ;macOS Monterey
  2. Java 8 和 Scala 2.12
  3. 集成开发环境(IDE)使用 IntelliJ IDEA
  4. Flink1.13.0

工具安装

安装netcat

一般Mac不自带netcat,我们的代码示例中有用到改工具,需要我们自行安装netcat

brew install netcat
==> Downloading https://ghcr.io/v2/homebrew/core/netcat/manifests/0.7.1-1
######################################################################## 100.0%
==> Downloading https://ghcr.io/v2/homebrew/core/netcat/blobs/sha256:7c33ed98a6c81011f5923240e11b87f07add5cea280f5e2754b2f3d7fc3d9eee
==> Downloading from https://pkg-containers.githubusercontent.com/ghcr1/blobs/sha256:7c33ed98a6c81011f5923240e11b87f07add5cea280f5e2754b2f3d7fc3d9eee?se=2022-09-17T10%3A40%3A00Z&sig=xlGiGyvIHi8peoADPzotAT
######################################################################## 100.0%
==> Pouring netcat--0.7.1.monterey.bottle.1.tar.gz
?  /usr/local/Cellar/netcat/0.7.1: 13 files, 135.7KB

IDE 和 Maven等工具

Mac m1 intellij idea gradle 通过brew快速安装配置及工具

代码示例

本地输入文件


hello world
hello flink
hello spark
hi hive
hi big data


flink早起版本批处理

基于DataSet[String] 的数据集处理

import org.apache.flink.api.scala._

object BatchWordCount {

  def main(args: Array[String]): Unit = {
    // 创建环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 读取文件
    val line_data_set = env.readTextFile("data/words.txt")
    // 处理数据 打散
    val words = line_data_set.flatMap(_.split(" ").map(x => (x, 1)))
    // 分组
    words.groupBy(0).sum(1).print
  }
}

flink 有界数据批处理

基于DataStream[String]的流数据处理

object BoundedStreamWordCount {
  def main(args: Array[String]): Unit = {

    // 创建流式处理环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment

    val value = environment.readTextFile("data/words.txt")
    // 处理数据 打散
    val words = value.flatMap(_.split(" ")).map(x => (x, 1))
    words.print()
    // 分组
    val value1 = words.keyBy(_._1)
    value1.sum(1).print

    environment.execute()
  }

  /**
   *  并行编号    结果
   *  4> (hello,1)
      8> (hi,1)
      1> (hi,1)
      3> (hello,1)
      6> (hello,1)
      6> (spark,1)
      3> (world,1)
      1> (big,1)
      4> (flink,1)
      1> (data,1)
      8> (hive,1)
      7> (flink,1)
      5> (world,1)
      2> (big,1)
      1> (hive,1)
      3> (hi,1)
      2> (data,1)
      3> (hi,2)
      3> (hello,1)
      3> (hello,2)
      1> (spark,1)
      3> (hello,3)
   */
}

flink 无界数据流式处理

启动netcat

nc -lk 5678
hello world
hello scala

运行如下代码

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建流式处理环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment

    val value = environment.socketTextStream("localhost",5678)
    // 处理数据 打散
    val words = value.flatMap(_.split(" ")).map(x => (x, 1))
    words.print()
    // 分组
    val value1 = words.keyBy(_._1)
    value1.sum(1).print

    environment.execute()
  }

    3> (hello,1)
    3> (world,1)
    5> (world,1)
    3> (hello,1)
    4> (hello,1)
    4> (scala,1)
    1> (scala,1)
    3> (hello,2)

flink 一步一步学习

flink是一个用于对无界和有界数据流进行有状态计算的框架和分布式处理引擎,被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算, 即它是一个持高吞吐、低延迟、高性能的分布式处理框架。是Apache软件基金会的顶级项目

flink图标

flink是什么?

对于技术学习,无论是什么语言什么工具,对其介绍或者说学习最主要的途径就是官方网站和主要文档。其它任何地方的信息都是其官方网站的信息挪用。对flink的介绍在其官网上有下边一段话。

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

大概翻译一下: flink是一个用于对无界和有界数据流进行有状态计算的框架和分布式处理引擎,被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算, 即它是一个持高吞吐、低延迟、高性能的分布式处理框架

  • 无界数据流:无界数据流有一个开始但是没有结束
  • 有界数据流:有界数据流有明确定义的开始和结束

flink的特点

用一句话来概述特点就是,flink基于数据流的状态计算。而在其官网地址打开时有一张图也可以具体的体现其特点。事件驱动、流处理、流批一体
flink的基本特点介绍

flink VS spark

在spark中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的,本质是RDD,是数据集合,是有界的。
在flink中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。

友情链接

  1. apache hive 大数据的ETL工具/ 数据仓库
  2. spark是什么?在大数据圈子里的地位如何
  3. Spark的RDD中的action(执行)和transformation(转换)两种操作
  4. hadoop distcp 分布式拷贝命令基本介绍