Scala 箭头符号使用场景 =>

我们在阅读或者使用Scala语言开发的程序或框架(如Spark)时经常会看到各种箭头符号,有的是单线箭头(-> | <-),而有的是等号箭头(=> ),还有的是双单线箭头(–>),这些符号在Scala的世界里被称为箭头函数,而且同一个箭头函数在不同的场景下是有不同的含义,不同的使用原则的。

本文主要先介绍下等号+方向符号的箭头函数 => 的用法

1、匿名函数

var mul = (x: Int, y: Int) => x + y
println(mul(3, 4)) //7

模式匹配(match…case)

  def TestMatch(x: Int): String = x match {
    case 1 => "one"
    case 2 => "two"
    case _ => "others"
  }
  println(TestMatch(3))

Scala 箭头函数 => By-Name Parameters

Scala 中允许使用高阶函数, 高阶函数可以使用其他函数作为参数,或者使用函数作为输出结果。通常情况下,函数的参数是传值参数;即参数的值在它被传递给函数之前被确定。但是,如果我们需要编写一个接收参数不希望马上计算,直到调用函数内的表达式才进行真正的计算的函数。对于这种情况,Scala提供按名称参数调用函数。

闲话少说,先撸代码:

object Tests {
  def main(args: Array[String]): Unit = {
    val a = stepOne(param())
    println(s"stepOne 返回的结果: --> ${a}")

  }
  def param(): Long ={
    println("进入param函数!每次调用都触发")
    System.nanoTime()
  }

  def stepOne(x: => Long): Long ={
    println("进入函数stepOne!")
    println(s"先调用参数x --> ${x} 打印出来")
    x
  }
}
Scala箭头函数 By-Name Parameters

从上图中我们看出来整个代码的执行顺序。在代码中,如果定义函数的时候,传入参数不是传入的值,而是传入的参数名称(如代码中使用x: => Long而不是x: Long),在调用该函数时,不会立即执行和参数有关的计算,而是到参数真正使用到的时候才进行计算。

使用“按名称传递参数”方式的优点是:1.减少不必要的计算; 2.减少异常。

Spark共享变量

spark提共的两种共享变量(广播变量和累加器)。至于为什么要共享变量是与spark的原理机制有关:通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传,也就是说有结果Driver程序是拿不到的!共享变量主要是为了解决这个问题。

1、广播变量

broadcast
val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello xasxt")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("./words.txt")
lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}
sc.stop()

注意事项:
问:能不能将一个RDD使用广播变量广播出去?
答:不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。
广播变量只能在Driver端定义,不能在Executor端定义。
在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

2、累加器

累加器只能够增加。 只有driver能获取到Accumulator的值(使用value方法),Task(excutor)只能对其做增加操作(使用 +=)。

def accumulator[T](initialValue: T,name: String)(implicit param: org.apache.spark.AccumulatorParam[T]): org.apache.spark.Accumulator[T] 

第一个参数应是数值类型,是累加器的初始值,第二个参数是该累加器的命字,这样就会在spark web ui中显示,可以帮助你了解程序运行的情况。

val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.accumulator(0)
sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}
println(accumulator.value)
sc.stop()

注意事项:
累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在Excutor端更新。

关于累加器多说一些,先看一个错误的例子:

val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator统计偶数出现的次数,同时偶数返回0,奇数返回1
val newData = data.map{x => {
  if(x%2 == 0){
    accum += 1
      0
    }else 1
}}
//使用action操作触发执行
newData.count
//此时accum的值为5,是我们要的结果
accum.value

//继续操作,查看刚才变动的数据,foreach也是action操作
newData.foreach(println)
//上个步骤没有进行累计器操作,可是累加器此时的结果已经是10了
//这并不是我们想要的结果
accum.value

spark中的一系列transform操作会构成DAG,此时需要通过一个action操作来触发,accumulator也是一样。因此在一个action操作之前,你调用value方法查看其数值,肯定是没有任何变化的。
所以在第一次count(action操作)之后,我们发现累加器的数值变成了5,是我们要的答案。
之后又对新产生的的newData进行了一次foreach(action操作),其实这个时候又执行了一次map(transform)操作,所以累加器又增加了5。最终获得的结果变成了10。

这种问题如何解决呢?

看了上面的分析,大家都有这种印象了,那就是使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。事实上,还是有解决方案的,只要将任务之间的依赖关系切断就可以了。

什么方法有这种功能呢?

你们肯定都想到了,cache,persist。调用这个方法的时候会将之前的依赖切除,后续的累加器就不会再被之前的transfrom操作影响到了。

总之,使用Accumulator时,为了保证准确性,只使用一次action操作

val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//代码和上方相同
val newData = data.map{x => {...}}
//使用cache缓存数据,切断依赖。
newData.cache.count
//此时accum的值为5
accum.value
newData.foreach(println)
//此时的accum依旧是5
accum.value