spark dataframe select 源码 对源码的小小窥探

spark dataframe 和 dataset 都是spark的数据集合抽象,而dataframe和dataset的关系可以用一行代码来说明,dataframe就是dataset泛化成Row的一种特例。当然本篇的重点不是为了讨论这两个数据集,而是通过对dataframe的select方法源码阅读来给读者提供一个队源码轻读的思路。

DataFrame = Dataset[Row]

dataframe的 select是一个Transformation,这个也不是我们的讨论重点,加上这句话纯粹为本站内部引流

spark dataframe select is a transformation

源码的版本gradle dependencies

dependencies {
    compile 'org.scala-lang:scala-library:2.11.8'
    compileOnly 'org.apache.spark:spark-core_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-sql_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-mllib_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-streaming_2.11:2.3.0'
}

源码内容

  /**
   * Selects a set of column based expressions.
   * {{{
   *   ds.select($"colA", $"colB" + 1)
   * }}}
   *
   * @group untypedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def select(cols: Column*): DataFrame = withPlan {
    Project(cols.map(_.named), planWithBarrier)
  }

  /**
   * Selects a set of columns. This is a variant of `select` that can only select
   * existing columns using column names (i.e. cannot construct expressions).
   *
   * {{{
   *   // The following two are equivalent:
   *   ds.select("colA", "colB")
   *   ds.select($"colA", $"colB")
   * }}}
   *
   * @group untypedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

  /**
   * :: Experimental ::
   * Returns a new Dataset by computing the given [[Column]] expression for each element.
   *
   * {{{
   *   val ds = Seq(1, 2, 3).toDS()
   *   val newDS = ds.select(expr("value + 1").as[Int])
   * }}}
   *
   * @group typedrel
   * @since 1.6.0
   */
  @Experimental
  @InterfaceStability.Evolving
  def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = {
    implicit val encoder = c1.encoder
    val project = Project(c1.withInputType(exprEnc, planWithBarrier.output).named :: Nil,
      planWithBarrier)

    if (encoder.flat) {
      new Dataset[U1](sparkSession, project, encoder)
    } else {
      // Flattens inner fields of U1
      new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder)).map(_._1)
    }
  }

scala.annotation.varargs

@scala.annotation.varargs — Var-Args函数(方法)意味着采用可变数量的参数。

Var-Args Functions(Methods) means taking Variable Number of Arguments. As a Java Developer, We have already tasted usage of Var-Args methods in Java-based Applications.

Scala also follows similar kind of syntax to define Var-Args parameter. But it uses * to define Var-Args parameter.–>Scala还遵循类似的语法来定义Var-Args参数。 但是它使用*定义Var-Args参数。

scala 可变参数

annotation 注解

advanced usages: 高级用法; experimental :实验性
  1. 这个包的作用在包的注释中已经做了最好的说明,标记一些api是实验性 或者 声明一些高级用法.
  2. 它主要利用java.lang.annotation来声明了一些自身的注解类或者接口
  3. public class InterfaceStability, 其中有三个接口分别是:
    (a). public @interface Stable {}; //Stable APIs
    (b). public @interface Evolving {}; //APIs that are meant to evolve towards becoming stable APIs, but are not stable APIs yet
    (c). public @interface Unstable {}; // Unstable APIs, with no guarantee on stability.

强弱类型

@group untypedrel 弱类型转换 | @group typedrel 强类型转换

在Saprk的结构化API中,可以分成两类,“无类型(untyped)”的DataFrame API和“类型化(typed)”的Dataset API.
确切的说Dataframe并不是”无类型”的, 它们有类型,只是类型检查没有那么严格,只检查这些类型是否在 运行时(run-time) 与schema中指定的类型对齐。
Dataset在 编译时(compile-time) 就会检查类型是否符合规范。
Dataset API仅适用于 基于JVM的语言(Scala和Java)。我们可以使用Scala 中的case class或Java bean来进行类型指定。

简单来说,如果转换是弱类型的,它将返回一个Dataframe(确切的说弱类型转换的返回类型还有 Column, RelationalGroupedDataset, DataFrameNaFunctions 和 DataFrameStatFunctions 等),而强类型转换返回的是一个Dataset。

星号的作用

select(cols: Column*) 参数带星号,在调用方式可以采用 冒号下划线星的方式把列表整个传入到方法中,如

select(cols_list.head, cols_list.tail:_*)

泛型类

[U1] : 泛型类, 是scala 语法的 泛化, 类似于java 的尖括号 <>;而UML建模语言中,是一种继承关系,表示一般与特殊的关系,它指定了子类如何特化父类的所有特征和行为。

TypedColumn

TypedColumn  是spark.sql.Column 的 伴生类。TypedColumn is a Column for the types of the input and the output. 也就是我们需要去spark.sql.Column中去看 TypedColumn  的具体实现,而我们在这个方法中就可以看到withInputType是它的内部方法。而它具体做了什么?有什么功能不在此处详细细述。

发表回复