Scala 下划线 介绍

scala 下划线 是每个学习这门语言的童鞋必不能错过的知识点。Scala单作为一门语言来看, 非常的简洁高效,在Scala中存在很多让代码更加简洁的语法,下划线“_”便是其中一个。

  1. 作为“通配符”,类似Java中的*。如import scala.math._
  2. :_*([[冒号下划线星]])作为一个整体,告诉编译器你希望将某个参数当作参数序列处理!例如val s = sum(1 to 5:_*)就是将1 to 5当作参数序列处理。
  3. 指代一个集合中的每个元素。a.filter(_%2\==0).map(2*),表示Array a中筛出偶数,并乘以2;如要对缓冲数组ArrayBuffer b排序 val bSorted = b.sorted()
  4. 在元组中,可以用方法_1, _2, _3访问组员。如a._2。其中句点可以用空格替代
    val t = (1, 2, 3) 
    println(t._1, t._2, t._3)
  5. 使用模式匹配可以用来获取元组的组员
    // 比如上一例中val (first, second, _) = t
    val (first, second, third) = t
  6. 划线_代表的是某一类型的默认值
    //String类型的默认值为null 
    var s: String = _
    //Double类型的默认值是 0.0
    var a: Double = _
  7. 匹配集合元素, [[下划线星]]表示变长参数队列
    //匹配以0开头,长度为三的列表 
    expr match { 
    case List(0, _, _) => println("found it") 
    case _ => 
    } 
    //匹配以0开头,长度任意的列表 
    expr match { 
    case List(0, _*) => println("found it") 
    case _ => 
    } 
    //匹配元组元素 
    expr match { 
    case (0, _) => println("found it") 
    case _ => 
    } 
    //将首元素赋值给head变量 
    val List(head, _*) = List("a")
    // 默认匹配
    str match{ 
    case "1" => println("match 1") 
    case _ => println("match default") 
    }
  8. [[function literal]]简写函数字面

    数的参数在函数体内只出现一次,则可以使用下划线代替

    val f1 = (_: Int) + (_: Int) 
    //等价于 
    val f2 = (x: Int, y: Int) => x + y 
    list.foreach(println(_)) 
    //等价于 
    list.foreach(e => println(e)) 
    list.filter(_ > 0) 
    //等价于 
    list.filter(x => x > 0)
  9. 定义一元操作符

    Scala中,操作符其实就是方法,例如1 + 1等价于1.+(1);利用下划线我们可以定义自己的左置操作符

    -2 
    //等价于 
    2.unary_-
  10. 定义赋值操作符

    通过下划线实现赋值操作符

    class Foo { 
    def name = { "foo" } 
    def name_=(str: String) { 
        println("set name " + str) 
    } 
    val m = new Foo()
    m.name = "Foo" 
    //等价于: 
    m.name_=("Foo")
  11. [[partially applied function]] 定义部分应用函数

    可以为某个函数只提供部分参数进行调用,返回的结果是一个新的函数,即部分应用函数

    def sum(a: Int, b: Int, c: Int) = a + b + c 
    val b = sum(1, _: Int, 3) 
    b: Int => Int = <function1> 
    b(2) //6
  12. 将方法转换成函数

    Scala中方法和函数是两个不同的概念,方法无法作为参数进行传递,也无法赋值给变量,但是函数是可以的, 在Scala中,利用下划线可以将方法转换成函数

    //将println方法转换成函数,并赋值给p 
    val p = println _ 
    //p: (Any) => Unit

spark Accumulator AccumulatorV2 累加器学习随笔

spark Accumulator AccumulatorV2 累加器是Spark的核心数据结构之一 — Spark的三大核心数据结构:RDD、累加器(只写不读)、广播变量(只读不写),累加器在不同的spark版本中有不一样的具体实现逻辑

spark累加器图示

spark Accumulator AccumulatorV2 累加器是Spark的核心数据结构之一 — Spark的三大核心数据结构:RDD、累加器(只写不读)、广播变量(只读不写),累加器在不同的spark版本中有不一样的具体实现逻辑;而累加器的基本逻辑过程如下

  1. 自定义变量在Spark中运算时,会从Driver中复制一份副本到Executor中运算,但变量的运算结果并不会返回给Driver,所以无法实现自定义变量的值改变,一直都是初始值,所以针对这个问题,引入了累加器的概念;
  2. 系统累加器longAccumulator和自定义累加器(extends AccumulatorV2[类型,类型])实际都是两步,new累加器,然后sc.register注册累加器;
  3. 先在Driver程序中创建一个值为0或者空的累加器对象,Task运算时,Executor中会copy一份累加器对象,在Executor中进行运算,累加器的运算结果返回给Driver程序并合并Merge,得出累加器最终结果
  4. 累加器.add(元素);具体对元素的操作包括数据sum、增加、删减、筛选等要求,都可以写在自定义累加器的.add()方法中。

Spark API

  • spark API的地址都可以在该网址中找到: https://spark.apache.org/docs/
  • 点击想要看到的版本,页面打导航栏有 API Docs ,点击想要了解的语言名称即可,例如点击 Scala,则进入对应的API地址 : https://spark.apache.org/docs/2.0.0/api/scala/index.html#org.apache.spark.package
  • 我们查看具体的类和方法的时候就特别注意一个单词 【Deprecated】,它有时候会出现类声明的最开始 Annotations,或者直接在具体的方法说明中出现。当出现这个单词的时候就意味着这个类或者方法在以后的版本中要慢慢被弃用或者替代

Accumulator 和 AccumulatorParam

Spark1.x 中实现累加器需要用到类 Accumulator和 AccumulatorParam。以spark1.6.3为例,内置数值类型的累加器用Accumulator类,而自定义累加器需要继承接口AccumulatorParam ,并实现相应的方法,而在2.0版本之后这个方式开始不再推荐使用了。

    // 在类的声明中出现了如下的说明,也就是该类将被 AccumulatorV2 所替代。
    Annotations @deprecated
    Deprecated (Since version 2.0.0) use AccumulatorV2
    trait AccumulatorParam[T] extends AccumulableParam[T, T]

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

AccumulatorV2

从spark2.0开始自定义累加器的实现不再提倡使用AccumulatorParam, 而是使用AccumulatorV2, 自定义类继承AccumulatorV2,并重写其固定的几个方法

  • reset:用于重置累加器为0
  • add:用于向累加器加一个值
  • merge:用于合并另一个同类型的累加器到当前累加器
  • copy():创建此累加器的新副本
  • isZero():返回该累加器是否为零值
  • value():获取此累加器的当前值
def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("Application")
        //构建Spark上下文对象
        val sc = new SparkContext(conf)

        //创建累加器
        val sum = new MyAccumulator()

        //注册累加器
        sc.register(sum,"accumulator")

        val rdd = sc.makeRDD(Array(1,2,3,4,5))

        rdd.map(item=>{
            sum.add(item)
        }).collect()
        println("sum = "+sum.value)

        //释放资源
        sc.stop()
    }

//自定义累加器
class MyAccumulator extends AccumulatorV2[Int,Int]{
    var sum = 0

    //1. 是否初始状态(sum为0表示累加器为初始状态)
    override def isZero: Boolean = sum == 0

    //2. 执行器执行时需要拷贝累加器对象(把累加器对象序列化后,从Driver传到Executor)
    override def copy(): AccumulatorV2[Int,Int] = {
        val mine = new MyAccumulator
        mine
    }

    //3. 重置数据(重置后看当前累加器是否为初始状态)
    override def reset(): Unit = sum = 0

    //累加数据
    override def add(v: Int): Unit = {
        sum = sum + v
    }

    //合并计算结果数据(把所有Executor中累加器value合并)
    override def merge(other: AccumulatorV2[Int, Int]): Unit = {
        sum = sum + other.value
    }

    //累加器的结果
    override def value: Int = sum
}

flink wordcount 一步一步学习 大数据

flink wordcount 初次看到这两个词的同学可能会有些不明所以,但是一般在大数据研发有过一定深耕或者了解的童鞋会瞬间get到在表达什么意思。大数据的代码研发不论是用什么语言什么工具,在最开始的时候都会有一个经典的代码示例,词数统计。

flink wordcount 初次看到这两个词的同学可能会有些不明所以,但是一般在大数据研发有过一定深耕或者了解的童鞋会瞬间get到在表达什么意思。大数据的代码研发不论是用什么语言什么工具,在最开始的时候都会有一个经典的代码示例,词数统计。

开发及运行环境

  1. MacBook Pro (13-inch, M1, 2020) ;macOS Monterey
  2. Java 8 和 Scala 2.12
  3. 集成开发环境(IDE)使用 IntelliJ IDEA
  4. Flink1.13.0

工具安装

安装netcat

一般Mac不自带netcat,我们的代码示例中有用到改工具,需要我们自行安装netcat

brew install netcat
==> Downloading https://ghcr.io/v2/homebrew/core/netcat/manifests/0.7.1-1
######################################################################## 100.0%
==> Downloading https://ghcr.io/v2/homebrew/core/netcat/blobs/sha256:7c33ed98a6c81011f5923240e11b87f07add5cea280f5e2754b2f3d7fc3d9eee
==> Downloading from https://pkg-containers.githubusercontent.com/ghcr1/blobs/sha256:7c33ed98a6c81011f5923240e11b87f07add5cea280f5e2754b2f3d7fc3d9eee?se=2022-09-17T10%3A40%3A00Z&sig=xlGiGyvIHi8peoADPzotAT
######################################################################## 100.0%
==> Pouring netcat--0.7.1.monterey.bottle.1.tar.gz
?  /usr/local/Cellar/netcat/0.7.1: 13 files, 135.7KB

IDE 和 Maven等工具

Mac m1 intellij idea gradle 通过brew快速安装配置及工具

代码示例

本地输入文件


hello world
hello flink
hello spark
hi hive
hi big data


flink早起版本批处理

基于DataSet[String] 的数据集处理

import org.apache.flink.api.scala._

object BatchWordCount {

  def main(args: Array[String]): Unit = {
    // 创建环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 读取文件
    val line_data_set = env.readTextFile("data/words.txt")
    // 处理数据 打散
    val words = line_data_set.flatMap(_.split(" ").map(x => (x, 1)))
    // 分组
    words.groupBy(0).sum(1).print
  }
}

flink 有界数据批处理

基于DataStream[String]的流数据处理

object BoundedStreamWordCount {
  def main(args: Array[String]): Unit = {

    // 创建流式处理环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment

    val value = environment.readTextFile("data/words.txt")
    // 处理数据 打散
    val words = value.flatMap(_.split(" ")).map(x => (x, 1))
    words.print()
    // 分组
    val value1 = words.keyBy(_._1)
    value1.sum(1).print

    environment.execute()
  }

  /**
   *  并行编号    结果
   *  4> (hello,1)
      8> (hi,1)
      1> (hi,1)
      3> (hello,1)
      6> (hello,1)
      6> (spark,1)
      3> (world,1)
      1> (big,1)
      4> (flink,1)
      1> (data,1)
      8> (hive,1)
      7> (flink,1)
      5> (world,1)
      2> (big,1)
      1> (hive,1)
      3> (hi,1)
      2> (data,1)
      3> (hi,2)
      3> (hello,1)
      3> (hello,2)
      1> (spark,1)
      3> (hello,3)
   */
}

flink 无界数据流式处理

启动netcat

nc -lk 5678
hello world
hello scala

运行如下代码

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建流式处理环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment

    val value = environment.socketTextStream("localhost",5678)
    // 处理数据 打散
    val words = value.flatMap(_.split(" ")).map(x => (x, 1))
    words.print()
    // 分组
    val value1 = words.keyBy(_._1)
    value1.sum(1).print

    environment.execute()
  }

    3> (hello,1)
    3> (world,1)
    5> (world,1)
    3> (hello,1)
    4> (hello,1)
    4> (scala,1)
    1> (scala,1)
    3> (hello,2)