spark dataframe select 源码 对源码的小小窥探

spark dataframe 和 dataset 都是spark的数据集合抽象,而dataframe和dataset的关系可以用一行代码来说明,dataframe就是dataset泛化成Row的一种特例。当然本篇的重点不是为了讨论这两个数据集,而是通过对dataframe的select方法源码阅读来给读者提供一个队源码轻读的思路。

DataFrame = Dataset[Row]

dataframe的 select是一个Transformation,这个也不是我们的讨论重点,加上这句话纯粹为本站内部引流

spark dataframe select is a transformation

源码的版本gradle dependencies

dependencies {
    compile 'org.scala-lang:scala-library:2.11.8'
    compileOnly 'org.apache.spark:spark-core_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-sql_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-mllib_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-streaming_2.11:2.3.0'
}

源码内容

  /**
   * Selects a set of column based expressions.
   * {{{
   *   ds.select($"colA", $"colB" + 1)
   * }}}
   *
   * @group untypedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def select(cols: Column*): DataFrame = withPlan {
    Project(cols.map(_.named), planWithBarrier)
  }

  /**
   * Selects a set of columns. This is a variant of `select` that can only select
   * existing columns using column names (i.e. cannot construct expressions).
   *
   * {{{
   *   // The following two are equivalent:
   *   ds.select("colA", "colB")
   *   ds.select($"colA", $"colB")
   * }}}
   *
   * @group untypedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

  /**
   * :: Experimental ::
   * Returns a new Dataset by computing the given [[Column]] expression for each element.
   *
   * {{{
   *   val ds = Seq(1, 2, 3).toDS()
   *   val newDS = ds.select(expr("value + 1").as[Int])
   * }}}
   *
   * @group typedrel
   * @since 1.6.0
   */
  @Experimental
  @InterfaceStability.Evolving
  def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = {
    implicit val encoder = c1.encoder
    val project = Project(c1.withInputType(exprEnc, planWithBarrier.output).named :: Nil,
      planWithBarrier)

    if (encoder.flat) {
      new Dataset[U1](sparkSession, project, encoder)
    } else {
      // Flattens inner fields of U1
      new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder)).map(_._1)
    }
  }

scala.annotation.varargs

@scala.annotation.varargs — Var-Args函数(方法)意味着采用可变数量的参数。

Var-Args Functions(Methods) means taking Variable Number of Arguments. As a Java Developer, We have already tasted usage of Var-Args methods in Java-based Applications.

Scala also follows similar kind of syntax to define Var-Args parameter. But it uses * to define Var-Args parameter.–>Scala还遵循类似的语法来定义Var-Args参数。 但是它使用*定义Var-Args参数。

scala 可变参数

annotation 注解

advanced usages: 高级用法; experimental :实验性
  1. 这个包的作用在包的注释中已经做了最好的说明,标记一些api是实验性 或者 声明一些高级用法.
  2. 它主要利用java.lang.annotation来声明了一些自身的注解类或者接口
  3. public class InterfaceStability, 其中有三个接口分别是:
    (a). public @interface Stable {}; //Stable APIs
    (b). public @interface Evolving {}; //APIs that are meant to evolve towards becoming stable APIs, but are not stable APIs yet
    (c). public @interface Unstable {}; // Unstable APIs, with no guarantee on stability.

强弱类型

@group untypedrel 弱类型转换 | @group typedrel 强类型转换

在Saprk的结构化API中,可以分成两类,“无类型(untyped)”的DataFrame API和“类型化(typed)”的Dataset API.
确切的说Dataframe并不是”无类型”的, 它们有类型,只是类型检查没有那么严格,只检查这些类型是否在 运行时(run-time) 与schema中指定的类型对齐。
Dataset在 编译时(compile-time) 就会检查类型是否符合规范。
Dataset API仅适用于 基于JVM的语言(Scala和Java)。我们可以使用Scala 中的case class或Java bean来进行类型指定。

简单来说,如果转换是弱类型的,它将返回一个Dataframe(确切的说弱类型转换的返回类型还有 Column, RelationalGroupedDataset, DataFrameNaFunctions 和 DataFrameStatFunctions 等),而强类型转换返回的是一个Dataset。

星号的作用

select(cols: Column*) 参数带星号,在调用方式可以采用 冒号下划线星的方式把列表整个传入到方法中,如

select(cols_list.head, cols_list.tail:_*)

泛型类

[U1] : 泛型类, 是scala 语法的 泛化, 类似于java 的尖括号 <>;而UML建模语言中,是一种继承关系,表示一般与特殊的关系,它指定了子类如何特化父类的所有特征和行为。

TypedColumn

TypedColumn  是spark.sql.Column 的 伴生类。TypedColumn is a Column for the types of the input and the output. 也就是我们需要去spark.sql.Column中去看 TypedColumn  的具体实现,而我们在这个方法中就可以看到withInputType是它的内部方法。而它具体做了什么?有什么功能不在此处详细细述。

LinkedInAttic ScANNS 近似邻近搜索 Spark LSH 替换方案

LinkedInAttic ScANNS 是近似邻近搜索的有效方案,对于使用spark LSH模型遇到的一些瓶颈有着天然高校的替换效果。笔者在以前的文章中介绍过Spark LSH的基本情况和主要的缺点,同时对于余弦相似的手动编写代码来实现近似效果的案例我们也介绍过。

不管哪一种情况,在于实际项目中的模型计算效果、spark集群的资源利用率、时间上的消耗等都是差强人意的,尤其是笔者自己的项目某次不稳定的模型运行导致集群的大量资源占用效果,对于实际工程的影响很大。

笔者痛定思痛,把Spark LSH的相似计算过程用 LinkedInAttic ScANNS 替换,以此做基本的记录。

spark 码农

1、原Spark LSH模型计算

  #val string_to_vector = functions.udf((x: String) =>{
  #	Vectors.dense(x.split(",").map(x => x.toDouble))
  #})

val w2vDf = spark.sql(s"select site_id, article_id, vectors from ${word2vec_tb}").
	  withColumn("wordvec", string_to_vector($"vectors")).persist(StorageLevel.MEMORY_AND_DISK_SER)
	val brp = new BucketedRandomProjectionLSH()
	  .setBucketLength(bucket_number)
	  .setNumHashTables(hash_number).
	  setSeed(seedNum)
	  .setInputCol("wordvec")
	  .setOutputCol("hashes")
	val brpModel = brp.fit(w2vDf)

对于模型的输入数据主要是分词后的数据集,而分词效果在余弦相似度的文章中有提及,(对于直接存储分词结果的数据落盘方案笔者是不提倡的,而是应该将分词后的所有数据全部都数字化,转成自定义词典的ID值来做后续的所有运算。)

2、原相似结果计算

# 加载LSH模型计算结果
val df = spark.sql(s"select site_id,article_id, vectors, hash_values from ${brpModeltb}").
	  withColumn("hashes", StringToDenseVector($"hash_values")).
	  withColumn("wordvec", string_to_vector($"vectors")).persist(StorageLevel.MEMORY_AND_DISK_SER)

# 计算JaccardDistance
val brpDf =  itemModel.approxSimilarityJoin(df, df, threshold , "JaccardDistance").persist(StorageLevel.MEMORY_AND_DISK)

log.info(s"save doc similarity result to ${dsimtb}")
# 计算全量相似结果
  brpDf.select($"datasetA".getField("article_id").as("article_id"),
		$"datasetA".getField("site_id").as("site_id"),
		$"datasetB".getField("article_id").as("id"),
		$"datasetB".getField("site_id").as("city_id"),
		$"JaccardDistance").
		filter(row => row.getAs[Long]("article_id") != row.getAs[Long]("id")  &&
		  row.getAs[Long]("site_id") == row.getAs[Long]("city_id")).
		select($"article_id",$"id",$"site_id").
		groupBy("article_id", "site_id").
		agg(functions.concat_ws(",", functions.collect_set($"id".cast("string"))).as("ids")).
		withColumn("ids", TopN_Sims(num)($"ids")).
		filter(row => !row.getAs[String]("ids").equals("")).write.mode("overwrite").saveAsTable(dsimtb)

上述代码是对全量数据做了相似计算并生成结果。其中有一些字段说明一下,site_id : 城市, article_id: 文章id, vectors和hash_values是做了 HashingTF 之后的LSH模型结算结果

3、原增量相似结果计算

由于Spark LSH的全量计算比较耗时且费资源,故做了增量的计算调度过程,但是正是Spark LSH对于鸡肋的增量计算进一步加大了它低效的定位。

# 加载LSH 模型 并广播模型
val itemModel = BucketedRandomProjectionLSHModel.load(brpModelpath)
val _model = spark.sparkContext.broadcast(itemModel)
# 加载增量数据
val new_sql = s"SELECT  site_id, article_id from  ${ODS_ARTICLE_SITE} WHERE  FROM_UNIXTIME(create_time,'yyyy-MM-dd') ='${dt}' AND status=1 AND type=0 AND site_id IN (${city_values}) "

	val articles = spark.sql(new_sql).join(df,Seq("site_id", "article_id"),"left").
	  filter(row => null != row.getAs("vectors")).rdd.map{row =>
	  row.getAs[Long]("site_id").toString + "#" +
		row.getAs[Long]("article_id").toString + "#" +
		row.getAs[String]("vectors")
	}.collect().toList
# 计算增量的相似结果
 articles.map{line =>
		val values = line.split("#")
		val site_id = values(0)
		val article_id = values(1)
		val vecs = values(2).split(",").map(x => x.toDouble)
		//	  val key = Vectors.sparse(values(1).toInt,values(3).split(",").map(x => x.toInt) ,values(3).split(",").map(x => x.toDouble))
		val key = Vectors.dense(vecs)
		val dd = _model.value.approxNearestNeighbors(df, key , (num * 1.5).toInt ).toDF().
		  filter(nrow => !nrow.getAs[Long]("article_id").toString.equals(article_id) &&
			nrow.getAs[Long]("site_id").toString.equals(site_id)).
		  select("article_id").rdd.map(x => x.getAs[Long]("article_id").toString).collect().toList
		(site_id, article_id, dd.mkString(","))
	  }.toDF("site_id","article_id","ids").
		filter(rr => !rr.getAs[String]("ids").equals("")).
		write.mode("overwrite").saveAsTable(dsimtb)

4、LinkedInAttic ScANNS 替换 Spark LSH

# 新的模型
val model = new CosineSignRandomProjectionNNS()
	  .setNumHashes(num_hash)
	  .setSignatureLength(sig_nature_length)
	  .setJoinParallelism(joinParallelism)
	  .setBucketLimit(bucket_limit)
	  .setShouldSampleBuckets(true)
	  .setNumOutputPartitions(num_partitions)
	  .createModel(dimension_len)
# 加载词向量
	val w2v = w2vDf.select("article_id","wordvec").
	  filter(x=> null != x.getAs("article_id") && null != x.getAs("wordvec") &&
		x.getAs("wordvec").asInstanceOf[org.apache.spark.ml.linalg.Vector].numNonzeros >0).
	  rdd.map{row=>
	  (row.getAs[Long]("article_id"), row.getAs("wordvec").asInstanceOf[org.apache.spark.ml.linalg.Vector])
	}.persist(StorageLevel.MEMORY_AND_DISK_SER)
# 生成相似结果
val nbrs = model.getSelfAllNearestNeighbors(w2v, numCandidates).toDF("article_id","oid","score").persist(StorageLevel.MEMORY_AND_DISK_SER)
	nbrs.join(w2vDf, Seq("article_id"), "left").
	  select("article_id","oid","site_id").
	  withColumnRenamed("site_id","city_id").
	  join(ods, Seq("oid"),"left").select("article_id","oid","city_id","site_id").
	  filter(x=> null != x.getAs("city_id") && null != x.getAs("site_id") &&
		x.getAs[Long]("city_id") == x.getAs[Long]("site_id")).select("article_id","oid").
	  groupBy("article_id").agg(functions.concat_ws(",", functions.collect_set($"oid".cast("string"))).as("ids")).
	  join(w2vDf,Seq("article_id"),"left").select("article_id","site_id","ids").
	  withColumn("article_id", $"article_id".cast("long")).withColumn("site_id", $"site_id".cast("long"))
	  .write.mode("overwrite").
	  saveAsTable(dsimtb)

整体的替换还是比较顺畅的,有一点需要主要 LinkedInAttic ScANNS 和scala的版本对应情况,有的版本编译后运行模型会出现问题。笔者在之前的文章中提到过具体的编译和错误处理方案,以及相关的版本号。

scala add elements 集合添加元素

scala add elements to List or Set,集合添加元素是scala预约经常遇到的操作。。笔者只对该两个集合的基本操作做入门级实验,具体应用中的细节不做过多的说明,请读者自行多多验证,多动手才能卷到往生。

默认情况下,scala 的List 和 Set 都是不变的,如果新增元素或者拼接新的集合其实都是生成了新的实例

List集合添加元素的一些操作

    val list1 =List(4,5,6,7,8)
    println(s"list1 = ${list1}")
    // 增加元素
    val list2 = 1::list1.::(9)
    println(s" :: 主动调用和被动添加 都放在集合前; list2 = ${list2}")
    val list3 = 10+:list1 :+ 11
    println(s"+: :+ 添加到集合前 集合后; list3 = ${list3}")
    // list 集合拼接
    val  list4 = list2 ++ list3
    println(s"两个集合拼接成新的集合; list4 = ${list4}")
    // list集合与list集合的元素拼接
    val list5 = list1 ::: list3
    println(s"list1 ::: list3 = ${list5}")
    val shuzu = Array(1, 3, 4) ++ Array(55,33 ,55)
    println(s"::: 只能用在List , 而++还可以用在 Array 数组: ${ shuzu.mkString(",") }")
list1 = List(4, 5, 6, 7, 8)
 :: 主动调用和被动添加 都放在集合前; list2 = List(1, 9, 4, 5, 6, 7, 8)
+: :+ 添加到集合前 集合后; list3 = List(10, 4, 5, 6, 7, 8, 11)
两个集合拼接成新的集合; list4 = List(1, 9, 4, 5, 6, 7, 8, 10, 4, 5, 6, 7, 8, 11)
list1 ::: list3 = List(4, 5, 6, 7, 8, 10, 4, 5, 6, 7, 8, 11)
::: 只能用在List , 而++还可以用在 Array 数组: 1,3,4,55,33,55

我们在以前的文章中有学习到Array的一些入门操作。读者可以移步自赏

Set集合添加元素的操作

scala set
默认set是不可变的
上图的输出结果
Set()
Set(5)

在上图中我们可以看到set1默认是空集合,它不可以添加元素;可能会有人对+号操作有误解, + 号操作返回的是一个新集合,并不是对set1的修改。而且我们定义set1用的是关键字val 那么set1对象实例本身就是不可以修改的。

    import scala.collection.mutable.Set
    val set1 = Set[Int]()
    set1.add(4)
    set1.add(4)
    set1 += 3
    set1.add(5)
    println(s"before add 6 , set1 = ${set1}")
    set1.add(6)
    println(s"after add 6 , set1 = ${set1}")
    set1.remove(4)
    println(s"after remove 4 , set1 = ${set1}")
    set1 ++= Vector(7, 8)
    println(s"after ++=Vector , set1 = ${set1}")
before add 6 , set1 = Set(5, 3, 4)
after add 6 , set1 = Set(5, 6, 3, 4)
after remove 4 , set1 = Set(5, 6, 3)
after ++=Vector , set1 = Set(5, 6, 3, 7, 8)