LinkedInAttic ScANNS 是近似邻近搜索的有效方案,对于使用spark LSH模型遇到的一些瓶颈有着天然高校的替换效果。笔者在以前的文章中介绍过Spark LSH的基本情况和主要的缺点 ,同时对于余弦相似 的手动编写代码来实现近似效果的案例我们也介绍过。
不管哪一种情况,在于实际项目中的模型计算效果、spark集群的资源利用率、时间上的消耗等都是差强人意的,尤其是笔者自己的项目某次不稳定的模型运行导致集群的大量资源占用效果,对于实际工程的影响很大。
笔者痛定思痛,把Spark LSH的相似计算过程用 LinkedInAttic ScANNS 替换,以此做基本的记录。
spark 码农
1、原Spark LSH模型计算
#val string_to_vector = functions.udf((x: String) =>{
# Vectors.dense(x.split(",").map(x => x.toDouble))
#})
val w2vDf = spark.sql(s"select site_id, article_id, vectors from ${word2vec_tb}").
withColumn("wordvec", string_to_vector($"vectors")).persist(StorageLevel.MEMORY_AND_DISK_SER)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(bucket_number)
.setNumHashTables(hash_number).
setSeed(seedNum)
.setInputCol("wordvec")
.setOutputCol("hashes")
val brpModel = brp.fit(w2vDf)
对于模型的输入数据主要是分词后的数据集,而分词效果在余弦相似度 的文章中有提及,(对于直接存储分词结果的数据落盘方案笔者是不提倡的,而是应该将分词后的所有数据全部都数字化,转成自定义词典的ID值来做后续的所有运算。)
2、原相似结果计算
# 加载LSH模型计算结果
val df = spark.sql(s"select site_id,article_id, vectors, hash_values from ${brpModeltb}").
withColumn("hashes", StringToDenseVector($"hash_values")).
withColumn("wordvec", string_to_vector($"vectors")).persist(StorageLevel.MEMORY_AND_DISK_SER)
# 计算JaccardDistance
val brpDf = itemModel.approxSimilarityJoin(df, df, threshold , "JaccardDistance").persist(StorageLevel.MEMORY_AND_DISK)
log.info(s"save doc similarity result to ${dsimtb}")
# 计算全量相似结果
brpDf.select($"datasetA".getField("article_id").as("article_id"),
$"datasetA".getField("site_id").as("site_id"),
$"datasetB".getField("article_id").as("id"),
$"datasetB".getField("site_id").as("city_id"),
$"JaccardDistance").
filter(row => row.getAs[Long]("article_id") != row.getAs[Long]("id") &&
row.getAs[Long]("site_id") == row.getAs[Long]("city_id")).
select($"article_id",$"id",$"site_id").
groupBy("article_id", "site_id").
agg(functions.concat_ws(",", functions.collect_set($"id".cast("string"))).as("ids")).
withColumn("ids", TopN_Sims(num)($"ids")).
filter(row => !row.getAs[String]("ids").equals("")).write.mode("overwrite").saveAsTable(dsimtb)
上述代码是对全量数据做了相似计算并生成结果。其中有一些字段说明一下,site_id : 城市, article_id: 文章id, vectors和hash_values是做了 HashingTF 之后的LSH模型结算结果
3、原增量相似结果计算
由于Spark LSH的全量计算比较耗时且费资源,故做了增量的计算调度过程,但是正是Spark LSH对于鸡肋的增量计算进一步加大了它低效的定位。
# 加载LSH 模型 并广播模型
val itemModel = BucketedRandomProjectionLSHModel.load(brpModelpath)
val _model = spark.sparkContext.broadcast(itemModel)
# 加载增量数据
val new_sql = s"SELECT site_id, article_id from ${ODS_ARTICLE_SITE} WHERE FROM_UNIXTIME(create_time,'yyyy-MM-dd') ='${dt}' AND status=1 AND type=0 AND site_id IN (${city_values}) "
val articles = spark.sql(new_sql).join(df,Seq("site_id", "article_id"),"left").
filter(row => null != row.getAs("vectors")).rdd.map{row =>
row.getAs[Long]("site_id").toString + "#" +
row.getAs[Long]("article_id").toString + "#" +
row.getAs[String]("vectors")
}.collect().toList
# 计算增量的相似结果
articles.map{line =>
val values = line.split("#")
val site_id = values(0)
val article_id = values(1)
val vecs = values(2).split(",").map(x => x.toDouble)
// val key = Vectors.sparse(values(1).toInt,values(3).split(",").map(x => x.toInt) ,values(3).split(",").map(x => x.toDouble))
val key = Vectors.dense(vecs)
val dd = _model.value.approxNearestNeighbors(df, key , (num * 1.5).toInt ).toDF().
filter(nrow => !nrow.getAs[Long]("article_id").toString.equals(article_id) &&
nrow.getAs[Long]("site_id").toString.equals(site_id)).
select("article_id").rdd.map(x => x.getAs[Long]("article_id").toString).collect().toList
(site_id, article_id, dd.mkString(","))
}.toDF("site_id","article_id","ids").
filter(rr => !rr.getAs[String]("ids").equals("")).
write.mode("overwrite").saveAsTable(dsimtb)
4、LinkedInAttic ScANNS 替换 Spark LSH
# 新的模型
val model = new CosineSignRandomProjectionNNS()
.setNumHashes(num_hash)
.setSignatureLength(sig_nature_length)
.setJoinParallelism(joinParallelism)
.setBucketLimit(bucket_limit)
.setShouldSampleBuckets(true)
.setNumOutputPartitions(num_partitions)
.createModel(dimension_len)
# 加载词向量
val w2v = w2vDf.select("article_id","wordvec").
filter(x=> null != x.getAs("article_id") && null != x.getAs("wordvec") &&
x.getAs("wordvec").asInstanceOf[org.apache.spark.ml.linalg.Vector].numNonzeros >0).
rdd.map{row=>
(row.getAs[Long]("article_id"), row.getAs("wordvec").asInstanceOf[org.apache.spark.ml.linalg.Vector])
}.persist(StorageLevel.MEMORY_AND_DISK_SER)
# 生成相似结果
val nbrs = model.getSelfAllNearestNeighbors(w2v, numCandidates).toDF("article_id","oid","score").persist(StorageLevel.MEMORY_AND_DISK_SER)
nbrs.join(w2vDf, Seq("article_id"), "left").
select("article_id","oid","site_id").
withColumnRenamed("site_id","city_id").
join(ods, Seq("oid"),"left").select("article_id","oid","city_id","site_id").
filter(x=> null != x.getAs("city_id") && null != x.getAs("site_id") &&
x.getAs[Long]("city_id") == x.getAs[Long]("site_id")).select("article_id","oid").
groupBy("article_id").agg(functions.concat_ws(",", functions.collect_set($"oid".cast("string"))).as("ids")).
join(w2vDf,Seq("article_id"),"left").select("article_id","site_id","ids").
withColumn("article_id", $"article_id".cast("long")).withColumn("site_id", $"site_id".cast("long"))
.write.mode("overwrite").
saveAsTable(dsimtb)
整体的替换还是比较顺畅的,有一点需要主要 LinkedInAttic ScANNS 和scala的版本对应情况,有的版本编译后运行模型会出现问题。笔者在之前的文章中提到过具体的编译和错误处理 方案,以及相关的版本号。