LinkedInAttic ScANNS 近似邻近搜索 Spark LSH 替换方案

LinkedInAttic ScANNS 是近似邻近搜索的有效方案,对于使用spark LSH模型遇到的一些瓶颈有着天然高校的替换效果。笔者在以前的文章中介绍过Spark LSH的基本情况和主要的缺点,同时对于余弦相似的手动编写代码来实现近似效果的案例我们也介绍过。

不管哪一种情况,在于实际项目中的模型计算效果、spark集群的资源利用率、时间上的消耗等都是差强人意的,尤其是笔者自己的项目某次不稳定的模型运行导致集群的大量资源占用效果,对于实际工程的影响很大。

笔者痛定思痛,把Spark LSH的相似计算过程用 LinkedInAttic ScANNS 替换,以此做基本的记录。

spark 码农

1、原Spark LSH模型计算

  #val string_to_vector = functions.udf((x: String) =>{
  #	Vectors.dense(x.split(",").map(x => x.toDouble))
  #})

val w2vDf = spark.sql(s"select site_id, article_id, vectors from ${word2vec_tb}").
	  withColumn("wordvec", string_to_vector($"vectors")).persist(StorageLevel.MEMORY_AND_DISK_SER)
	val brp = new BucketedRandomProjectionLSH()
	  .setBucketLength(bucket_number)
	  .setNumHashTables(hash_number).
	  setSeed(seedNum)
	  .setInputCol("wordvec")
	  .setOutputCol("hashes")
	val brpModel = brp.fit(w2vDf)

对于模型的输入数据主要是分词后的数据集,而分词效果在余弦相似度的文章中有提及,(对于直接存储分词结果的数据落盘方案笔者是不提倡的,而是应该将分词后的所有数据全部都数字化,转成自定义词典的ID值来做后续的所有运算。)

2、原相似结果计算

# 加载LSH模型计算结果
val df = spark.sql(s"select site_id,article_id, vectors, hash_values from ${brpModeltb}").
	  withColumn("hashes", StringToDenseVector($"hash_values")).
	  withColumn("wordvec", string_to_vector($"vectors")).persist(StorageLevel.MEMORY_AND_DISK_SER)

# 计算JaccardDistance
val brpDf =  itemModel.approxSimilarityJoin(df, df, threshold , "JaccardDistance").persist(StorageLevel.MEMORY_AND_DISK)

log.info(s"save doc similarity result to ${dsimtb}")
# 计算全量相似结果
  brpDf.select($"datasetA".getField("article_id").as("article_id"),
		$"datasetA".getField("site_id").as("site_id"),
		$"datasetB".getField("article_id").as("id"),
		$"datasetB".getField("site_id").as("city_id"),
		$"JaccardDistance").
		filter(row => row.getAs[Long]("article_id") != row.getAs[Long]("id")  &&
		  row.getAs[Long]("site_id") == row.getAs[Long]("city_id")).
		select($"article_id",$"id",$"site_id").
		groupBy("article_id", "site_id").
		agg(functions.concat_ws(",", functions.collect_set($"id".cast("string"))).as("ids")).
		withColumn("ids", TopN_Sims(num)($"ids")).
		filter(row => !row.getAs[String]("ids").equals("")).write.mode("overwrite").saveAsTable(dsimtb)

上述代码是对全量数据做了相似计算并生成结果。其中有一些字段说明一下,site_id : 城市, article_id: 文章id, vectors和hash_values是做了 HashingTF 之后的LSH模型结算结果

3、原增量相似结果计算

由于Spark LSH的全量计算比较耗时且费资源,故做了增量的计算调度过程,但是正是Spark LSH对于鸡肋的增量计算进一步加大了它低效的定位。

# 加载LSH 模型 并广播模型
val itemModel = BucketedRandomProjectionLSHModel.load(brpModelpath)
val _model = spark.sparkContext.broadcast(itemModel)
# 加载增量数据
val new_sql = s"SELECT  site_id, article_id from  ${ODS_ARTICLE_SITE} WHERE  FROM_UNIXTIME(create_time,'yyyy-MM-dd') ='${dt}' AND status=1 AND type=0 AND site_id IN (${city_values}) "

	val articles = spark.sql(new_sql).join(df,Seq("site_id", "article_id"),"left").
	  filter(row => null != row.getAs("vectors")).rdd.map{row =>
	  row.getAs[Long]("site_id").toString + "#" +
		row.getAs[Long]("article_id").toString + "#" +
		row.getAs[String]("vectors")
	}.collect().toList
# 计算增量的相似结果
 articles.map{line =>
		val values = line.split("#")
		val site_id = values(0)
		val article_id = values(1)
		val vecs = values(2).split(",").map(x => x.toDouble)
		//	  val key = Vectors.sparse(values(1).toInt,values(3).split(",").map(x => x.toInt) ,values(3).split(",").map(x => x.toDouble))
		val key = Vectors.dense(vecs)
		val dd = _model.value.approxNearestNeighbors(df, key , (num * 1.5).toInt ).toDF().
		  filter(nrow => !nrow.getAs[Long]("article_id").toString.equals(article_id) &&
			nrow.getAs[Long]("site_id").toString.equals(site_id)).
		  select("article_id").rdd.map(x => x.getAs[Long]("article_id").toString).collect().toList
		(site_id, article_id, dd.mkString(","))
	  }.toDF("site_id","article_id","ids").
		filter(rr => !rr.getAs[String]("ids").equals("")).
		write.mode("overwrite").saveAsTable(dsimtb)

4、LinkedInAttic ScANNS 替换 Spark LSH

# 新的模型
val model = new CosineSignRandomProjectionNNS()
	  .setNumHashes(num_hash)
	  .setSignatureLength(sig_nature_length)
	  .setJoinParallelism(joinParallelism)
	  .setBucketLimit(bucket_limit)
	  .setShouldSampleBuckets(true)
	  .setNumOutputPartitions(num_partitions)
	  .createModel(dimension_len)
# 加载词向量
	val w2v = w2vDf.select("article_id","wordvec").
	  filter(x=> null != x.getAs("article_id") && null != x.getAs("wordvec") &&
		x.getAs("wordvec").asInstanceOf[org.apache.spark.ml.linalg.Vector].numNonzeros >0).
	  rdd.map{row=>
	  (row.getAs[Long]("article_id"), row.getAs("wordvec").asInstanceOf[org.apache.spark.ml.linalg.Vector])
	}.persist(StorageLevel.MEMORY_AND_DISK_SER)
# 生成相似结果
val nbrs = model.getSelfAllNearestNeighbors(w2v, numCandidates).toDF("article_id","oid","score").persist(StorageLevel.MEMORY_AND_DISK_SER)
	nbrs.join(w2vDf, Seq("article_id"), "left").
	  select("article_id","oid","site_id").
	  withColumnRenamed("site_id","city_id").
	  join(ods, Seq("oid"),"left").select("article_id","oid","city_id","site_id").
	  filter(x=> null != x.getAs("city_id") && null != x.getAs("site_id") &&
		x.getAs[Long]("city_id") == x.getAs[Long]("site_id")).select("article_id","oid").
	  groupBy("article_id").agg(functions.concat_ws(",", functions.collect_set($"oid".cast("string"))).as("ids")).
	  join(w2vDf,Seq("article_id"),"left").select("article_id","site_id","ids").
	  withColumn("article_id", $"article_id".cast("long")).withColumn("site_id", $"site_id".cast("long"))
	  .write.mode("overwrite").
	  saveAsTable(dsimtb)

整体的替换还是比较顺畅的,有一点需要主要 LinkedInAttic ScANNS 和scala的版本对应情况,有的版本编译后运行模型会出现问题。笔者在之前的文章中提到过具体的编译和错误处理方案,以及相关的版本号。

SparkMl-BucketedRandomProjectionLSH(欧几里德距离度量-局部敏感哈希)

LSH(Locality Sensitive Hashing)翻译成中文,叫做“局部敏感哈希”,它是一种针对海量高维数据的快速最近邻查找算法。可以用于推荐,例如用某用户的历史浏览信息,做信息推荐

BucketedRandomProjectionLSH是一个Estimator。我们在做大规模文本相似度计算的时候我们可能会用的模型,当然除了它之外还有一个MinHashLSH。MinHash 是一个用于Jaccard 距离的 LSH family。而BucketedRandomProjectionLSH是对MinHashLSH的一种优化,增加了的桶的概念,来降低整个计算的负责度。

参数信息参数描述备注
setInputColDF中待变换的特征特征类型必须为:vector
setOutputCol变换后的特征名称
setBucketLength每个哈希桶的长度必填
setNumHashTables哈希表的数量默认1
setSeed随机种子随机数生成使用,默认:772209414
approxSimilarityJoin(datasetA, datasetB, threshold)过滤模型分析数据集A与B中距离大于等于threshold数据
approxNearestNeighbors(datasetA, key, numNearestNeighbors)Spark的LSH的输出TopK
注:模型的输入特征可以是连续特征也可以是离散特征

离散特征代码示例

//特征名称
var features = Array("weight", "height", "age")
//字段转换成特征向量
var splitDatas = new VectorAssembler()
.setInputCols(features)
.setOutputCol("vector_features")
.transform(dataFrame.select("id", features:_*))
.randomSplit(Array(0.4, 0.3, 0.3))
//训练模型
var model:BucketedRandomProjectionLSHModel = new BucketedRandomProjectionLSH()
.setInputCol("vector_features")         //待变换的特征
.setOutputCol("bkt_lsh")                //变换后的特征名称
.setBucketLength(10d)                   //每个哈希桶的长度,更大的桶降低了假阴性率
.setNumHashTables(5)                    //哈希表的数量,散列表数量的增加降低了错误的否定率,如果降低它的值则会提高运行性能
.setSeed(100L)                          //随机种子
.fit(splitDatas.apply(0))               //训练
//通过模型转换数据
var transform = model.transform(splitDatas.apply(0))
transform.show(10, 100)
transform.printSchema()
//推荐信息,获取相关性较高的数据
var recommend= model.approxSimilarityJoin(splitDatas.apply(1), splitDatas.apply(2), 2, "distCol")
.select(
    col("datasetA").getField("id").as("id"), 
    col("datasetB").getField("id").as("recommend_id"), 
    col("datasetA").getField("age").as("age"), 
    col("datasetB").getField("age").as("recommend_age"), 
    col("datasetA").getField("weight").as("weight"), 
    col("datasetB").getField("weight").as("recommend_weight"), 
    col("datasetA").getField("height").as("height"), 
    col("datasetB").getField("height").as("recommend_height"),
    col("distCol")
)
recommend.orderBy("id", "distCol").show(100, 1000)

连续特征代码示例

	// df 数据集是dataframe,并且words字段是格式是 ["我","爱","北京","天安门"]的词列表Array[String]
    val word2Vec = new Word2Vec()
	  .setInputCol("words")
	  .setOutputCol("wordvec")
	  .setVectorSize(10)
	  .setMinCount(0)
	val wvModel = word2Vec.fit(df)
	val w2vDf = wvModel.transform(df)

	val brp = new BucketedRandomProjectionLSH()
	  .setBucketLength(4d)
	  .setNumHashTables(10)
	  .setInputCol("wordvec")
	  .setOutputCol("hashes")
	val brpModel = brp.fit(w2vDf)
	val tsDf = brpModel.transform(w2vDf)

	val key_value = List(0.13868775751827092,-0.11639275898904025,0.19808808788014898,0.2722799372859299,0.08275626220836721,-0.2846828463712129,0.2887565325463897,-0.05958885527697617,0.042977130971848965,-0.03787828763497287)
	val key = Vectors.dense(key_value.toArray)
	val a = brpModel.approxNearestNeighbors(tsDf, key , 3).toDF()

敲黑板:
LSH模型的使用中最关键的还是要看setBucketLength和setNumHashTables两个参数的设置。因为这两个参数决定了模型的计算效果和性能。当然这之前的输入数据结构也很重要,我们抛开输入数据的差异性不说来谈这两个参数。总的来说这两个参数的数值尽可能小一些,这样性能会高,但是推荐结果可能准确率低一些,这本身是一个博弈的过程。

缺点们:

  • 计算不稳定:Spark的LSH动不动卡着不动或者慢或者OOM,主要原因是join步骤相当消耗资源和桶内数据倾斜导致,然而在倾斜的桶内暴力搜索可能是不值得的,因为相似度数据对可能也在另一个不倾斜的桶内出现了
  • 数据丢失:调用approxSimilarityJoin会莫名其妙的丢失实体,比如输入1000个实体做最近邻50个检索,最后只输出了200个实体的top50,这个问题不是半径太小导致的,而是哈希之后没有任何一条(hash_table,哈希值)一样可以join上的数据对,这个问题是参数设置导致的,LSH调参比较蛋疼
  • 不能对所有实体输出TopK:Spark的LSH的approxNearestNeighbors是输出TopK,和需求完全切合,但是这个API不支持在全表操作,只能输出一个实体进行推荐,所以只能使用join方法再对join到的数据对进行排序取topK,相当浪费计算资源
  • 不支持余弦相似度:Spark的BucketedRandomProjectionLSH不支持余弦相似度,这个影响不大,可以先做一步归一化然后用欧氏距离,但是不是很方便