spark dataframe 和 dataset 都是spark的数据集合抽象,而dataframe和dataset的关系可以用一行代码来说明,dataframe就是dataset泛化成Row的一种特例。当然本篇的重点不是为了讨论这两个数据集,而是通过对dataframe的select方法源码阅读来给读者提供一个队源码轻读的思路。
DataFrame = Dataset[Row]
dataframe的 select是一个Transformation,这个也不是我们的讨论重点,加上这句话纯粹为本站内部引流
spark dataframe select is a transformation
源码的版本gradle dependencies
dependencies { compile 'org.scala-lang:scala-library:2.11.8' compileOnly 'org.apache.spark:spark-core_2.11:2.3.0' compileOnly 'org.apache.spark:spark-sql_2.11:2.3.0' compileOnly 'org.apache.spark:spark-mllib_2.11:2.3.0' compileOnly 'org.apache.spark:spark-streaming_2.11:2.3.0' }
源码内容
/** * Selects a set of column based expressions. * {{{ * ds.select($"colA", $"colB" + 1) * }}} * * @group untypedrel * @since 2.0.0 */ @scala.annotation.varargs def select(cols: Column*): DataFrame = withPlan { Project(cols.map(_.named), planWithBarrier) } /** * Selects a set of columns. This is a variant of `select` that can only select * existing columns using column names (i.e. cannot construct expressions). * * {{{ * // The following two are equivalent: * ds.select("colA", "colB") * ds.select($"colA", $"colB") * }}} * * @group untypedrel * @since 2.0.0 */ @scala.annotation.varargs def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*) /** * :: Experimental :: * Returns a new Dataset by computing the given [[Column]] expression for each element. * * {{{ * val ds = Seq(1, 2, 3).toDS() * val newDS = ds.select(expr("value + 1").as[Int]) * }}} * * @group typedrel * @since 1.6.0 */ @Experimental @InterfaceStability.Evolving def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = { implicit val encoder = c1.encoder val project = Project(c1.withInputType(exprEnc, planWithBarrier.output).named :: Nil, planWithBarrier) if (encoder.flat) { new Dataset[U1](sparkSession, project, encoder) } else { // Flattens inner fields of U1 new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder)).map(_._1) } }
scala.annotation.varargs
@scala.annotation.varargs — Var-Args函数(方法)意味着采用可变数量的参数。
Var-Args Functions(Methods) means taking Variable Number of Arguments. As a Java Developer, We have already tasted usage of Var-Args methods in Java-based Applications.
Scala also follows similar kind of syntax to define Var-Args parameter. But it uses * to define Var-Args parameter.–>Scala还遵循类似的语法来定义Var-Args参数。 但是它使用*定义Var-Args参数。
scala 可变参数
annotation 注解
advanced usages: 高级用法; experimental :实验性
- 这个包的作用在包的注释中已经做了最好的说明,标记一些api是实验性 或者 声明一些高级用法.
- 它主要利用java.lang.annotation来声明了一些自身的注解类或者接口
- public class InterfaceStability, 其中有三个接口分别是:
(a). public @interface Stable {}; //Stable APIs
(b). public @interface Evolving {}; //APIs that are meant to evolve towards becoming stable APIs, but are not stable APIs yet
(c). public @interface Unstable {}; // Unstable APIs, with no guarantee on stability.
强弱类型
@group untypedrel 弱类型转换 | @group typedrel 强类型转换
在Saprk的结构化API中,可以分成两类,“无类型(untyped)”的DataFrame API和“类型化(typed)”的Dataset API.
确切的说Dataframe并不是”无类型”的, 它们有类型,只是类型检查没有那么严格,只检查这些类型是否在 运行时(run-time) 与schema中指定的类型对齐。
Dataset在 编译时(compile-time) 就会检查类型是否符合规范。
Dataset API仅适用于 基于JVM的语言(Scala和Java)。我们可以使用Scala 中的case class或Java bean来进行类型指定。
简单来说,如果转换是弱类型的,它将返回一个Dataframe(确切的说弱类型转换的返回类型还有 Column, RelationalGroupedDataset, DataFrameNaFunctions 和 DataFrameStatFunctions 等),而强类型转换返回的是一个Dataset。
星号的作用
select(cols: Column*) 参数带星号,在调用方式可以采用 冒号下划线星的方式把列表整个传入到方法中,如
select(cols_list.head, cols_list.tail:_*)
泛型类
[U1] : 泛型类, 是scala 语法的 泛化, 类似于java 的尖括号 <>;而UML建模语言中,是一种继承关系,表示一般与特殊的关系,它指定了子类如何特化父类的所有特征和行为。
TypedColumn
TypedColumn
是spark.sql.Column 的 伴生类。TypedColumn
is a Column for the types of the input and the output. 也就是我们需要去spark.sql.Column中去看 TypedColumn
的具体实现,而我们在这个方法中就可以看到withInputType是它的内部方法。而它具体做了什么?有什么功能不在此处详细细述。