Mac m1 brew install spark

前言

Mac m1 brew install spark。大数据组件spark入门学习时会有很多系统性的知识需要我们去学习,比如在大数据整个生态圈里spark、Hadoop等组件的角色定位、价值、应用方向、系统安装等。正是这个系统性的思维模式会让我们在各个计算机系统环境中布置spark的时候都惯性的先把Hadoop环境搭建好再依次搭建spark,实际上在Mac笔记本本地安装spark组件的时候是可以不安装Hadoop的,毕竟大多数在Mac上安装spark是为了一些rdd语法的练习,不是为了大量数据的处理。

Mac本身只有自己的软件管理工具的,类似于我们用Linux系统,不同的内核用不同的命令来安装所需的软件或者工具包,如yum, apt-get等。Mac的软件包管理命令是 brew,安装spark环境所需要的所有的软件包都可以用brew来管理。

基本情况

  1. MacBook Pro (13-inch, M1, 2020)
  2. macOS 12.3.1 (21E258)
  3. java version “1.8.0_291” java的安装和环境配置可以通过 文章: http://www.lifefunker.com/archives/51, 来了解具体的操作。有一个事情强调一下,没有bash_profile文件就别自己画蛇添足非要新建一个了…..,没有是因为真的它不再叫这个名字了,找个.zshrc 文件

安装

通过brew安装软件主要也是三步骤:

  • 查找软件包 — brew search
  • 明确软件包 — brew info
  • 安装软件包 — brew install
~ % brew search spark
==> Formulae
apache-spark ✔          spark                   sparkey                 spack                   sparse                  par

==> Casks
spark                               spark-ar-studio                     sparkle                             sparkleshare

命令brew search spark 顾名思义就是查找spark软件包,可以看到返回多个结果,这些结果都是名称,我们要安装的时候都是install对应的名称即可。但是我们发现有Formulae和Casks两个不同的,一般我们按Formulae。

  • 「Formulae」一般是那些命令行工具、开发库、字体、插件等不含 GUI 界面的软件。
  • 「Cask」是指那些含有 GUI 图形化界面的软件,如 Google Chrome、FireFox 、Atom 等。
~ % brew info apache-spark
apache-spark: stable 3.2.1 (bottled), HEAD
Engine for large-scale data processing
https://spark.apache.org/
/usr/local/Cellar/apache-spark/3.2.1 (1,472 files, 322MB) *
  Poured from bottle on 2022-05-16 at 17:36:18
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/apache-spark.rb
License: Apache-2.0
==> Dependencies
Required: openjdk@11 ✔
==> Options
--HEAD
	Install HEAD version
==> Analytics
install: 4,389 (30 days), 13,792 (90 days), 54,640 (365 days)
install-on-request: 4,387 (30 days), 13,777 (90 days), 54,556 (365 days)
 ~ % brew info spark
Warning: Treating spark as a formula. For the cask, use homebrew/cask/spark
spark: stable 1.0.1 (bottled)
Sparklines for the shell
https://zachholman.com/spark/
Not installed
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/spark.rb
License: MIT
==> Analytics
install: 559 (30 days), 1,612 (90 days), 5,995 (365 days)
install-on-request: 544 (30 days), 1,566 (90 days), 5,834 (365 days)
build-error: 0 (30 days)

通过brew info 我们分别查看了 apache-spark 和 spark 的两个软件包的信息,可以看到spark的版本太老旧了,至此我们决定安装apache-spark。

brew install apache-spark

安装完成后,环境变量一般是不会有可执行的命令的,需要我们去zshrc文件中添加对应的sparkhome,但是如果不想那么麻烦就直接去安装的路径下,找到对应的spark-shell文件添加软链来快速打开即可。

spark dataframe select 源码 对源码的小小窥探

spark dataframe 和 dataset 都是spark的数据集合抽象,而dataframe和dataset的关系可以用一行代码来说明,dataframe就是dataset泛化成Row的一种特例。当然本篇的重点不是为了讨论这两个数据集,而是通过对dataframe的select方法源码阅读来给读者提供一个队源码轻读的思路。

DataFrame = Dataset[Row]

dataframe的 select是一个Transformation,这个也不是我们的讨论重点,加上这句话纯粹为本站内部引流

spark dataframe select is a transformation

源码的版本gradle dependencies

dependencies {
    compile 'org.scala-lang:scala-library:2.11.8'
    compileOnly 'org.apache.spark:spark-core_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-sql_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-mllib_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-streaming_2.11:2.3.0'
}

源码内容

  /**
   * Selects a set of column based expressions.
   * {{{
   *   ds.select($"colA", $"colB" + 1)
   * }}}
   *
   * @group untypedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def select(cols: Column*): DataFrame = withPlan {
    Project(cols.map(_.named), planWithBarrier)
  }

  /**
   * Selects a set of columns. This is a variant of `select` that can only select
   * existing columns using column names (i.e. cannot construct expressions).
   *
   * {{{
   *   // The following two are equivalent:
   *   ds.select("colA", "colB")
   *   ds.select($"colA", $"colB")
   * }}}
   *
   * @group untypedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

  /**
   * :: Experimental ::
   * Returns a new Dataset by computing the given [[Column]] expression for each element.
   *
   * {{{
   *   val ds = Seq(1, 2, 3).toDS()
   *   val newDS = ds.select(expr("value + 1").as[Int])
   * }}}
   *
   * @group typedrel
   * @since 1.6.0
   */
  @Experimental
  @InterfaceStability.Evolving
  def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = {
    implicit val encoder = c1.encoder
    val project = Project(c1.withInputType(exprEnc, planWithBarrier.output).named :: Nil,
      planWithBarrier)

    if (encoder.flat) {
      new Dataset[U1](sparkSession, project, encoder)
    } else {
      // Flattens inner fields of U1
      new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder)).map(_._1)
    }
  }

scala.annotation.varargs

@scala.annotation.varargs — Var-Args函数(方法)意味着采用可变数量的参数。

Var-Args Functions(Methods) means taking Variable Number of Arguments. As a Java Developer, We have already tasted usage of Var-Args methods in Java-based Applications.

Scala also follows similar kind of syntax to define Var-Args parameter. But it uses * to define Var-Args parameter.–>Scala还遵循类似的语法来定义Var-Args参数。 但是它使用*定义Var-Args参数。

scala 可变参数

annotation 注解

advanced usages: 高级用法; experimental :实验性
  1. 这个包的作用在包的注释中已经做了最好的说明,标记一些api是实验性 或者 声明一些高级用法.
  2. 它主要利用java.lang.annotation来声明了一些自身的注解类或者接口
  3. public class InterfaceStability, 其中有三个接口分别是:
    (a). public @interface Stable {}; //Stable APIs
    (b). public @interface Evolving {}; //APIs that are meant to evolve towards becoming stable APIs, but are not stable APIs yet
    (c). public @interface Unstable {}; // Unstable APIs, with no guarantee on stability.

强弱类型

@group untypedrel 弱类型转换 | @group typedrel 强类型转换

在Saprk的结构化API中,可以分成两类,“无类型(untyped)”的DataFrame API和“类型化(typed)”的Dataset API.
确切的说Dataframe并不是”无类型”的, 它们有类型,只是类型检查没有那么严格,只检查这些类型是否在 运行时(run-time) 与schema中指定的类型对齐。
Dataset在 编译时(compile-time) 就会检查类型是否符合规范。
Dataset API仅适用于 基于JVM的语言(Scala和Java)。我们可以使用Scala 中的case class或Java bean来进行类型指定。

简单来说,如果转换是弱类型的,它将返回一个Dataframe(确切的说弱类型转换的返回类型还有 Column, RelationalGroupedDataset, DataFrameNaFunctions 和 DataFrameStatFunctions 等),而强类型转换返回的是一个Dataset。

星号的作用

select(cols: Column*) 参数带星号,在调用方式可以采用 冒号下划线星的方式把列表整个传入到方法中,如

select(cols_list.head, cols_list.tail:_*)

泛型类

[U1] : 泛型类, 是scala 语法的 泛化, 类似于java 的尖括号 <>;而UML建模语言中,是一种继承关系,表示一般与特殊的关系,它指定了子类如何特化父类的所有特征和行为。

TypedColumn

TypedColumn  是spark.sql.Column 的 伴生类。TypedColumn is a Column for the types of the input and the output. 也就是我们需要去spark.sql.Column中去看 TypedColumn  的具体实现,而我们在这个方法中就可以看到withInputType是它的内部方法。而它具体做了什么?有什么功能不在此处详细细述。

LinkedInAttic ScANNS 近似邻近搜索 Spark LSH 替换方案

LinkedInAttic ScANNS 是近似邻近搜索的有效方案,对于使用spark LSH模型遇到的一些瓶颈有着天然高校的替换效果。笔者在以前的文章中介绍过Spark LSH的基本情况和主要的缺点,同时对于余弦相似的手动编写代码来实现近似效果的案例我们也介绍过。

不管哪一种情况,在于实际项目中的模型计算效果、spark集群的资源利用率、时间上的消耗等都是差强人意的,尤其是笔者自己的项目某次不稳定的模型运行导致集群的大量资源占用效果,对于实际工程的影响很大。

笔者痛定思痛,把Spark LSH的相似计算过程用 LinkedInAttic ScANNS 替换,以此做基本的记录。

spark 码农

1、原Spark LSH模型计算

  #val string_to_vector = functions.udf((x: String) =>{
  #	Vectors.dense(x.split(",").map(x => x.toDouble))
  #})

val w2vDf = spark.sql(s"select site_id, article_id, vectors from ${word2vec_tb}").
	  withColumn("wordvec", string_to_vector($"vectors")).persist(StorageLevel.MEMORY_AND_DISK_SER)
	val brp = new BucketedRandomProjectionLSH()
	  .setBucketLength(bucket_number)
	  .setNumHashTables(hash_number).
	  setSeed(seedNum)
	  .setInputCol("wordvec")
	  .setOutputCol("hashes")
	val brpModel = brp.fit(w2vDf)

对于模型的输入数据主要是分词后的数据集,而分词效果在余弦相似度的文章中有提及,(对于直接存储分词结果的数据落盘方案笔者是不提倡的,而是应该将分词后的所有数据全部都数字化,转成自定义词典的ID值来做后续的所有运算。)

2、原相似结果计算

# 加载LSH模型计算结果
val df = spark.sql(s"select site_id,article_id, vectors, hash_values from ${brpModeltb}").
	  withColumn("hashes", StringToDenseVector($"hash_values")).
	  withColumn("wordvec", string_to_vector($"vectors")).persist(StorageLevel.MEMORY_AND_DISK_SER)

# 计算JaccardDistance
val brpDf =  itemModel.approxSimilarityJoin(df, df, threshold , "JaccardDistance").persist(StorageLevel.MEMORY_AND_DISK)

log.info(s"save doc similarity result to ${dsimtb}")
# 计算全量相似结果
  brpDf.select($"datasetA".getField("article_id").as("article_id"),
		$"datasetA".getField("site_id").as("site_id"),
		$"datasetB".getField("article_id").as("id"),
		$"datasetB".getField("site_id").as("city_id"),
		$"JaccardDistance").
		filter(row => row.getAs[Long]("article_id") != row.getAs[Long]("id")  &&
		  row.getAs[Long]("site_id") == row.getAs[Long]("city_id")).
		select($"article_id",$"id",$"site_id").
		groupBy("article_id", "site_id").
		agg(functions.concat_ws(",", functions.collect_set($"id".cast("string"))).as("ids")).
		withColumn("ids", TopN_Sims(num)($"ids")).
		filter(row => !row.getAs[String]("ids").equals("")).write.mode("overwrite").saveAsTable(dsimtb)

上述代码是对全量数据做了相似计算并生成结果。其中有一些字段说明一下,site_id : 城市, article_id: 文章id, vectors和hash_values是做了 HashingTF 之后的LSH模型结算结果

3、原增量相似结果计算

由于Spark LSH的全量计算比较耗时且费资源,故做了增量的计算调度过程,但是正是Spark LSH对于鸡肋的增量计算进一步加大了它低效的定位。

# 加载LSH 模型 并广播模型
val itemModel = BucketedRandomProjectionLSHModel.load(brpModelpath)
val _model = spark.sparkContext.broadcast(itemModel)
# 加载增量数据
val new_sql = s"SELECT  site_id, article_id from  ${ODS_ARTICLE_SITE} WHERE  FROM_UNIXTIME(create_time,'yyyy-MM-dd') ='${dt}' AND status=1 AND type=0 AND site_id IN (${city_values}) "

	val articles = spark.sql(new_sql).join(df,Seq("site_id", "article_id"),"left").
	  filter(row => null != row.getAs("vectors")).rdd.map{row =>
	  row.getAs[Long]("site_id").toString + "#" +
		row.getAs[Long]("article_id").toString + "#" +
		row.getAs[String]("vectors")
	}.collect().toList
# 计算增量的相似结果
 articles.map{line =>
		val values = line.split("#")
		val site_id = values(0)
		val article_id = values(1)
		val vecs = values(2).split(",").map(x => x.toDouble)
		//	  val key = Vectors.sparse(values(1).toInt,values(3).split(",").map(x => x.toInt) ,values(3).split(",").map(x => x.toDouble))
		val key = Vectors.dense(vecs)
		val dd = _model.value.approxNearestNeighbors(df, key , (num * 1.5).toInt ).toDF().
		  filter(nrow => !nrow.getAs[Long]("article_id").toString.equals(article_id) &&
			nrow.getAs[Long]("site_id").toString.equals(site_id)).
		  select("article_id").rdd.map(x => x.getAs[Long]("article_id").toString).collect().toList
		(site_id, article_id, dd.mkString(","))
	  }.toDF("site_id","article_id","ids").
		filter(rr => !rr.getAs[String]("ids").equals("")).
		write.mode("overwrite").saveAsTable(dsimtb)

4、LinkedInAttic ScANNS 替换 Spark LSH

# 新的模型
val model = new CosineSignRandomProjectionNNS()
	  .setNumHashes(num_hash)
	  .setSignatureLength(sig_nature_length)
	  .setJoinParallelism(joinParallelism)
	  .setBucketLimit(bucket_limit)
	  .setShouldSampleBuckets(true)
	  .setNumOutputPartitions(num_partitions)
	  .createModel(dimension_len)
# 加载词向量
	val w2v = w2vDf.select("article_id","wordvec").
	  filter(x=> null != x.getAs("article_id") && null != x.getAs("wordvec") &&
		x.getAs("wordvec").asInstanceOf[org.apache.spark.ml.linalg.Vector].numNonzeros >0).
	  rdd.map{row=>
	  (row.getAs[Long]("article_id"), row.getAs("wordvec").asInstanceOf[org.apache.spark.ml.linalg.Vector])
	}.persist(StorageLevel.MEMORY_AND_DISK_SER)
# 生成相似结果
val nbrs = model.getSelfAllNearestNeighbors(w2v, numCandidates).toDF("article_id","oid","score").persist(StorageLevel.MEMORY_AND_DISK_SER)
	nbrs.join(w2vDf, Seq("article_id"), "left").
	  select("article_id","oid","site_id").
	  withColumnRenamed("site_id","city_id").
	  join(ods, Seq("oid"),"left").select("article_id","oid","city_id","site_id").
	  filter(x=> null != x.getAs("city_id") && null != x.getAs("site_id") &&
		x.getAs[Long]("city_id") == x.getAs[Long]("site_id")).select("article_id","oid").
	  groupBy("article_id").agg(functions.concat_ws(",", functions.collect_set($"oid".cast("string"))).as("ids")).
	  join(w2vDf,Seq("article_id"),"left").select("article_id","site_id","ids").
	  withColumn("article_id", $"article_id".cast("long")).withColumn("site_id", $"site_id".cast("long"))
	  .write.mode("overwrite").
	  saveAsTable(dsimtb)

整体的替换还是比较顺畅的,有一点需要主要 LinkedInAttic ScANNS 和scala的版本对应情况,有的版本编译后运行模型会出现问题。笔者在之前的文章中提到过具体的编译和错误处理方案,以及相关的版本号。