hive tf idf 基于hive sql的方式实现词频-逆文档频率(tf idf)的计算

TF-IDF(Term Frequency-Inverse Document Frequency, 词频-逆文件频率)

是一种用于信息检索与文本挖掘的常用加权技术。tf-idf是一种统计方法,用以评估一字词对于一个文件集或一个语料库中的其中一份文件的重要程度。字词的重要性随着它在文件中出现的次数成正比增加,但同时会随着它在语料库中出现的频率成反比下降。tf-idf加权的各种形式常被搜索引擎应用,作为文件与用户查询之间相关程度的度量或评级。除了tf-idf以外,互联网上的搜索引擎还会使用基于链接分析的评级方法,以确定文件在搜索结果中出现的顺序。

计算tf-idf的方式很多,如果我们用spark计算也会有现成的工具包来实现,但是对于很多sql boys 或者sql girls 来说,可能更习惯于用sql语句来实现这个值的计算。此篇文章我们抛开其它的编程语言,就用hive sql来计算语料库中所有词的tf-idf,并取得一篇文章(document)的关键词(keywords)。

1、基本公式

我们不再对每个统计值的定义和含义做过多的解释,很多文章中都会介绍到,我们只贴出基础的公式。

词频计算公式
逆文档频率计算公式
tf-idf计算公式

2、实现逻辑及流程

第一步,先把可能用到的中间表建出来,我们的建表没有业务硬要求,也没有特定的特定的模式,只是根据我们的公式分成了三张表,来逐步体现公式内容。第一张表示构建了所有词的基础表,为的是确定每一个词的id; 第二张表是建立一张每一个词和文章的关联表,目的是把词转化成id;第三张表是集合了所有计算所需要用的统计值。

-- 构建关键词字典表
create table if not exists key_word_dict(id int , word string) comment '关键词字典表';
-- 构建文章和词的关联表
create table if not exists article_word_dict(word_id int, article_id int) comment '文章和词的关联表';
-- tf_idf词频-逆文档频率基础表
create table if not exists article_word_tf_idf 
(
    article_id int comment '文章id'
    ,article_total_word_cnt int comment '文章总词数'
    ,word_id int comment '词id'
    ,word_in_article_cnt int comment '文章中出现词的次数'
    ,cnt_article_with_word int comment '出现该词的文章数'
    ,total_article int comment '语料文章总数'
    ,tf double comment '词频'
    ,idf double comment '逆文档频率'
    ,tf_idf double
) comment 'tf_idf词频-逆文档频率基础表';

第二步,对每一张表的内容开始初始化或者生成对应的统计值。词的基础表我们是从原始表中的整合汇总出来的词表,并且对一些不规则的词做了过滤清洗的处理。

-- 初始化词的基础表数据
insert overwrite table key_word_dict
select  
        row_number()over() as id,
        a.keyword as word
from    (
            select  trim(kws) as keyword
            from    ods_article_search fcj
            left join ods_article_detail detail
            on      fcj.article_id = detail.article_id
            lateral view explode(split(detail.keywords, ',')) ws as kws
            where   detail.article_id is not null
            group by kws
        ) a
WHERE   trim(a.keyword) IS NOT NULL
AND     trim(a.keyword) != ""
AND     trim(a.keyword) != "null"
AND     length(trim(a.keyword)) > 1
AND     length(trim(a.keyword)) < 10
AND     (trim(a.keyword) NOT REGEXP ('\”'))
AND     (trim(a.keyword) NOT REGEXP ('='))
AND     (trim(a.keyword) NOT REGEXP ('-'))
AND     (trim(a.keyword) NOT REGEXP (','))
AND     (trim(a.keyword) NOT REGEXP ("\""))
AND     (trim(a.keyword) NOT REGEXP (" "))
AND     (trim(a.keyword) NOT REGEXP ("、"))
AND     (trim(a.keyword) NOT REGEXP (":"))
AND     (trim(a.keyword) NOT REGEXP (":"))
AND     (trim(a.keyword) NOT REGEXP ("\\."))
AND     (trim(a.keyword) NOT REGEXP ("\/"))
AND     (trim(a.keyword) NOT REGEXP ("?"))
AND     (trim(a.keyword) NOT REGEXP ("。"))
AND     (trim(a.keyword) NOT REGEXP ("\\("))
AND     (trim(a.keyword) NOT REGEXP ("\\?"))
AND     (trim(a.keyword) NOT REGEXP ("!"))
AND     (trim(a.keyword) NOT REGEXP ("\\%"))
AND     (trim(a.keyword) NOT REGEXP ("\\."))
AND     (trim(a.keyword) NOT REGEXP ("\\~"))
AND     (trim(a.keyword) NOT REGEXP ("^[0-9]+$"))
AND     (trim(a.keyword) NOT REGEXP ("[0-9]+$"))
AND     (trim(a.keyword) NOT REGEXP ("^[0-9]+"))
;

从分词表中拿到每一篇文章和分词后每个词与其的对应关系,分词表是我们之前就做好。而分词的方式我们可以通过之前的文章所介绍的利用spark + ansj来实现。

-- 生成文章和词关联的表
 insert overwrite table article_word_dict
 select  dict.id
         ,a.article_id
 from    key_word_dict dict
 left join (
               select  fcj.article_id
                       ,trim(kws) as keyword
               from    ods_article_search fcj
               left join dwd_article_key_words detail
               on      fcj.article_id = detail.article_id
               lateral view explode(words) ws as kws
               where   detail.article_id is not null
           ) a
 on      dict.word = a.keyword
 where   a.keyword is not null
 and     a.article_id is not null
 ;
-- 生成tf-idf基础表数据
insert overwrite table article_word_tf_idf 
select  e.article_id
        ,e.article_total_word_cnt
        ,e.word_id
        ,e.word_in_article_cnt
        ,f.cnt_article_with_word
        ,f.total_article
        ,e.tf
        ,f.idf
        ,e.tf * f.idf as tf_idf
from    (
            select  b.article_id
                    ,b.article_total_word_cnt
                    ,a.word_id
                    ,a.word_in_article_cnt
                    ,a.word_in_article_cnt / b.article_total_word_cnt as tf
            from    (
                        select  sum(1) article_total_word_cnt
                                ,article_id
                        from    article_word_dict
                        group by article_id
                    ) b
            left join (
                          select  count(word_id) word_in_article_cnt
                                  ,article_id
                                  ,word_id
                          from    article_word_dict
                          group by article_id
                                   ,word_id
                      ) a
            on      b.article_id = a.article_id
            where   a.article_id is not null
            and     a.article_id is not null
        ) e
left join (
              select  d.word_id
                      ,log(d.total_article / ( d.cnt_article_with_word +1)) as idf
                      ,d.total_article
                      ,d.cnt_article_with_word
              from    (
                          select  c.word_id
                                  ,c.cnt_article_with_word
                                  ,(
                                      select count(distinct article_id) as article_cnt from article_word_dict
                                  ) as total_article
                          from    (
                                      select  count(distinct article_id) as cnt_article_with_word
                                              ,word_id
                                      from    article_word_dict
                                      group by word_id
                                  ) c
                      ) d
          ) f
on      e.word_id = f.word_id
where   f.word_id is not null
and     e.word_id is not null
;

最后,我们通过tf-idf的基础表可以取一篇文章的前5个关键词,会用到hive sql的 rank over 开窗函数的组合。基本的展示效果如下图

select  *
from    (
            select  a.article_id
                    ,a.tf_idf
                    ,b.word
                    ,rank()over(partition by a.article_id order by a.tf_idf desc ) as sort_num
            from    article_word_tf_idf a
            left join key_word_dict b
            on      a.word_id = b.id
            where   b.id is not null
        ) a
where   a.sort_num <= 5
;
取文章前5名的关键词

spark scala 分词 利用 ansj jsoup 对带有页面标签的文章分词

场景描述:我们有很多文章需要进行分词处理,而文章存储的内容是带有html标签的;我们需要利用自定义的字典来对文章进行分词。这里我们使用ansj来自定义字典和停留词进行分词,同时还需要先用jsoup对文章的内容从html标签中提取出来。

1、环境说明

如果开发或者生成环境是自建的spark运行环境,记得通过maven或者gradle引入 ansj, 对应的版本需要根据您spark的版本去maven库检索并且maven检索到响应的jar包后可以拿到maven或者gradle的引入方式;如果读者用到了阿里云的MaxCompute计算引擎,在dataworks上搭建环境,则需要构建如下的ODPS Spark,把ansj_seg-**.jar, nlp-lang-**.jar, spark-interpreter-**.jar 加入到dataworks的生存环境中即可。

ODPS Spark 配置界面

2、主要实现逻辑

第一步,先构建一个自定义的字典库,这个库的实现方式是多样的,可以通过预处理的代码生成一个文本文件,一行一个词的形式并把文件整个放到hdfs 或者其它对象存储的路径下,又或者把这些词放贷一张spark可以读取的表中;之后分词之前加载进来即可。

第二步,预加载自定义的词库初始化ansj的自定义词典;利用spark广播的方式,把自定义的词典广播出去。

第三步,利用广播的字典在每个分区里处理每一篇文章(分词之前去html标签)

笔者自定义词库是放到了hive表中,通过hive + sparksql的方式预处理生成的词库;而在加载词库生成自定义词典时的代码如下,之所以用flatmap是因为之前预处理的词库一行用逗号连接了多个词,如果读者是一行一个词直接用map处理即可。

返回自定义词典之后,把这个词典通过broadcast广播

/**
	* 加载自定义词典
	* @param spark
	* @return
	*/
  def load_KeyWords(spark: SparkSession): Forest ={
	//自定义词库的key名(可以任意定义)
	val dicKey = "articleDic"

	val keywords = spark.sql(s"select keywords from artile_keywords_table")
	val words = keywords.rdd.filter(x => null != x).flatMap{row =>
	  row.getAs[String]("keywords").split(",").filter(x=> null != x && !x.trim.equals("") && !x.trim.equals("null") && x.trim.length > 1)
	}.distinct().persist(StorageLevel.MEMORY_AND_DISK)
	val word_num = words.count()
	words.collect().foreach(word => DicLibrary.insertOrCreate(dicKey, word.trim, DicLibrary.DEFAULT_NATURE, DicLibrary.DEFAULT_FREQ))
	log.info(s"user dict words is ${word_num}")
	words.unpersist()
	DicLibrary.get(dicKey)
  }

	  val user_dict = load_KeyWords(spark)
	  //广播自定义词典
	  val userDic = spark.sparkContext.broadcast(user_dict)

加载所有的文章,利用字典分词,文章加载及处理分词的主要逻辑代码如下,查询hive表中文章的id,标题和内容,通过mapPartitions算子把每一篇文章都分词

      // 文章的id, 标题,内容 
      val sql=s"SELECT a.article_id,a.title, b.content from article_content a"
	  val df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK)
      res.write.mode("overwrite").saveAsTable(ARTICLE_SPLIT_WORDS_TB)
	  val res = df.rdd.mapPartitions(
		iter => {
		  iter.map(line =>
			(line.getAs("article_id").toString.trim.toLong, dict_parse_words(userDic.value, line.getAs[String]("title") + line.getAs[String]("content") ))
		  )
		}
	  ).toDF("article_id","words")

分词的时候还自定义了一些需要过滤的词性以及停留词(停留词没有做过多的处理,因为项目有其它业务逻辑的应用所有尽可能多的留下了全部的有效分词)

def dict_parse_words(dic: Forest=DicLibrary.get(DicLibrary.DEFAULT), text: String): Map[String, Int] ={
	//分词准备
	val stop = new StopRecognition()
	stop.insertStopNatures("w")//过滤掉标点
	stop.insertStopNatures("t")//过滤掉时间
	stop.insertStopNatures("m")//过滤掉m词性
	stop.insertStopNatures("null")//过滤null词性
	stop.insertStopNatures("<br />")//过滤<br />词性
	stop.insertStopNatures("v") // 过滤动词
	stop.insertStopNatures("q") // 过滤量词
	stop.insertStopNatures("u") // 过滤助词
	stop.insertStopNatures("y") // 过滤语气词
	stop.insertStopNatures(":")
	stop.insertStopNatures("'")
	stop.insertStopWords("<p>")
	stop.insertStopWords("<td>")
	stop.insertStopWords("align","<",">","text", "style", "strong", ")","(")
    // 利用jsoup提取html的内容
	val _text = jsoup.Jsoup.parse(text)
	val _parse = DicAnalysis.parse(_text.body().text(), dic).recognition(stop)

	val words = _parse.getTerms.toArray.toList.filter(x =>
	  x.asInstanceOf[Term].natrue().natureStr.equals(DicLibrary.DEFAULT_NATURE) &&
	  x.asInstanceOf[Term].getRealName.length > 1 &&
	  !Tools.isNumber(x.asInstanceOf[Term].getRealName) &&
	  !x.asInstanceOf[Term].getRealName.trim.equals(""))
	words.map(x => x.toString.split("/")(0)).groupBy(l=>l).map(t => (t._1,t._2.length))
  }

spark mllib 余弦相似度 文本相似推荐 scala代码实现

文本相似推荐是推荐系统中最常见的一个方向,之前的文章中我们介绍过基于LSH模型推荐的主要逻辑,今天我们介绍一个基于余弦相似度的文本推荐实现,其中主要会有一些关键实现代码的介绍和主要函数、方法的说明。

我们对文章的内容作了分词处理,把分词后每篇文章的词列表做最原始的横向张开,一次在文章中出现多少次,在分词后的列表中就输出多少次。如下图,列表第一位是文章的唯一标识ID, 后续跟着每篇文章出现的每个词的重复N次。

我们对每一行的数据做处理,一共形成两个列,列名分别是article_id, words.

文章分词后的基本效果

1、HashingTF (特征HASH-频数)

HashingTF 是一个Transformer,在文本处理中,接收词条的集合然后把这些集合转化成固定长度的特征向量。这个算法在哈希的同时会统计各个词条的词频。因为数据量大的原因,把词hash到有限的空间里,但是一般针对于小数据量的话,直接不用此方法。

HashingTF 基本参数介绍
	val hashingTF = new HashingTF(Math.pow(2, 18).toInt)

	val newSTF = data.rdd.map{row =>
	  val tf = hashingTF.transform(row.getAs[String]("words").split(",").toList)
	  (row.getAs[Long]("article_id"), tf)
	}

上述代码处理后的数据基本输出如下,是(article_id, 稀疏向量)的格式。

MLlib的向量主要分为两种,DenseVector和SparseVector,前者是用来保存稠密向量,后者是用来保存稀疏向量。稀疏向量和密集向量都是向量的表示方法,我们用到的稀疏向量。

用稀疏格式表示为(262144,[15036,19364,47826,83116,97232,121425,148594,155955,178252,213110],[3.0,2.0,2.0,6.0,4.0,2.0,5.0,2.0,3.0,6.0]) 第一个262144表示向量的长度(元素个数),[15036,19364,47826,83116,97232,121425,148594,155955,178252,213110]就是indices数组,[3.0,2.0,2.0,6.0,4.0,2.0,5.0,2.0,3.0,6.0]是values数组 表示向量0的位置的值是3.0,2的位置的值是2.0,以此类推后续…,而其他没有表示出来的位置都是0

密集向量的值就是一个普通的Double数组 而稀疏向量由两个并列的 数组indices和values组成 例如:向量(1.0,0.0,1.0,3.0)用密集格式表示为[1.0,0.0,1.0,3.0]

(1254041,(262144,[15036,19364,47826,83116,97232,121425,148594,155955,178252,213110],[3.0,2.0,2.0,6.0,4.0,2.0,5.0,2.0,3.0,6.0]))
(1254702,(262144,[13250,45738,74579,83816,114706,140294,168038,178252,179286,228860],[6.0,2.0,3.0,2.0,3.0,2.0,4.0,2.0,3.0,2.0]))
(1270204,(262144,[23837,89849,118991,128753,141701,144329,154077,181588,206007,216577],[8.0,9.0,7.0,30.0,6.0,9.0,8.0,8.0,2.0,6.0]))
(1274990,(262144,[1825,47533,66014,72996,128753,137905,176245,230362,246875,261184],[2.0,4.0,2.0,6.0,7.0,2.0,3.0,3.0,5.0,4.0]))
(1275492,(262144,[453,3346,23862,48581,81453,97906,177907,192248,211981,222855],[5.0,7.0,2.0,6.0,7.0,3.0,8.0,4.0,2.0,5.0]))
(1276010,(262144,[24180,37181,38884,83812,139707,170385,183537,186838,210495,252680],[3.0,6.0,6.0,8.0,3.0,4.0,2.0,2.0,6.0,15.0]))
(1284572,(262144,[4080,34822,41345,46525,55782,114461,137175,137700,141004,257853],[5.0,9.0,9.0,12.0,16.0,9.0,20.0,4.0,9.0,19.0]))
(1285584,(262144,[50361,54739,60330,119368,127547,208981,219001,248279,255987,257167],[5.0,3.0,7.0,7.0,3.0,8.0,7.0,6.0,10.0,3.0]))
(1286389,(262144,[85281,89699,118991,133021,165941,179035,193877,224875,243109,255164],[2.0,4.0,3.0,2.0,5.0,3.0,3.0,2.0,4.0,2.0]))
(1305106,(262144,[30697,36373,65307,73404,82006,86639,123217,142848,168038,178252],[3.0,4.0,5.0,11.0,5.0,17.0,19.0,4.0,11.0,5.0]))
(1315406,(262144,[42610,64869,74579,134320,200367,205339,240419,241590,246953,259244],[7.0,6.0,6.0,11.0,6.0,2.0,7.0,5.0,3.0,10.0]))
(1321265,(262144,[38382,39825,48154,48615,156641,189005,191338,201265,216118,245329],[13.0,9.0,4.0,2.0,9.0,6.0,4.0,8.0,7.0,6.0]))
(1338600,(262144,[10344,36515,76239,82006,115775,148514,165030,190556,213678,226409],[8.0,7.0,9.0,3.0,6.0,3.0,25.0,4.0,11.0,3.0]))
(1343798,(262144,[17834,44919,75299,90387,112873,116611,124084,144329,232930,235221],[2.0,8.0,14.0,2.0,12.0,2.0,2.0,15.0,7.0,5.0]))
(1348953,(262144,[6878,15912,27529,32639,55938,129711,145474,200367,215919,232930],[7.0,7.0,5.0,13.0,7.0,14.0,13.0,14.0,17.0,10.0]))
(1349040,(262144,[32887,49573,55355,64662,111523,157281,228604,240842,252200,261899],[2.0,7.0,5.0,3.0,2.0,3.0,3.0,7.0,3.0,3.0]))
(1376022,(262144,[69651,86071,103299,150372,153082,163559,175435,218723,220682,253252],[3.0,6.0,7.0,9.0,7.0,6.0,3.0,6.0,9.0,2.0]))
(1404146,(262144,[17964,45478,98252,119251,146330,170312,192362,203817,215919,232930],[2.0,6.0,6.0,8.0,6.0,6.0,6.0,5.0,10.0,7.0]))
(1423152,(262144,[6878,47048,74579,110350,123974,137905,145474,200367,215919,242312],[3.0,6.0,13.0,11.0,6.0,6.0,9.0,18.0,12.0,2.0]))
(1439223,(262144,[65855,68559,71276,82006,115775,174947,202636,217083,226336,231174],[10.0,21.0,2.0,5.0,11.0,35.0,9.0,25.0,16.0,4.0]))

2、IDF model构建

为了方便展示推荐结果的效果,我们对数据集做过滤,只留下两篇文章,最终输出的结果是

[1254702,1423152,1315406,1305106]
[1270204,1286389,1274990,1343798]

们对第一行 1254702的分词数据做去重处理后 得到如下,虽然分词的效果很low,但是通过比较low的分词计算的结果是正常逻辑的。

1270204 [‘开发商’, ‘莆田’, ‘别墅’, ‘府邸’, ‘公寓’, ‘福建延寿山庄酒店管理有限公司’, ‘用地’, ‘藏珑’, ’10’, ‘1’, ‘独栋’, ‘建售’]
1315406 [‘普查’, ‘调控’, ‘时评’, ‘房产税’, ‘房屋’, ‘刚需’, ‘人口’, ‘存量’, ’10’, ‘1’, ‘房价’, ‘地毯式’]
1423152 [‘调控’, ‘房地产’, ‘长效’, ‘限贷’, ‘限购’, ‘蛮劲’, ’10’, ‘限价’, ‘1’, ‘房价’, ‘网民’, ‘楼市’]
1305106 [‘资金’, ‘房产’, ‘全款’, ‘背负’, ‘购房者’, ‘买房’, ‘房子’, ’10’, ‘一身’, ‘1’, ‘怎么’, ‘贷款’]

	//构建idf model
	val idf = new IDF().fit(newSTF.values)
	//将tf向量转换成tf-idf向量
	val newsIDF = newSTF.mapValues(v => idf.transform(v))

	val bIDF = spark.sparkContext.broadcast(newsIDF.collect())
    // 过滤最后的结果
	val docSims = newsIDF.filter(x => Seq(1254702, 1270204).toList.contains(x._1)).flatMap{
	  case (id1, idf1) =>
		val idfs = bIDF.value.filter(_._1 != id1)
		val sv1 = idf1.asInstanceOf[SV]
		//构建向量1
		val bsv1 = new SparseVector[Double](sv1.indices, sv1.values, sv1.size)
		//取相似度最大的前10个
		idfs.map {
		  case (id2, idf2) =>
			val sv2 = idf2.asInstanceOf[SV]
			//构建向量2
			val bsv2 = new SparseVector[Double](sv2.indices, sv2.values, sv2.size)
			//计算两向量点乘除以两向量范数得到向量余弦值
			val cosSim = bsv1.dot(bsv2) / (norm(bsv1) * norm(bsv2))
			(id1, id2, cosSim)
		}.sortWith(_._3>_._3).take(3)
	}.toDF("article_id1","article_id2", "score").
	  filter(x => x.getAs[Double]("score") > 0.0).
	  select($"article_id1",$"article_id2").
	  groupBy("article_id1").agg(functions.concat_ws(",", functions.collect_set($"article_id2".cast("string"))).as("ids"))
    docSims.rdd.foreach(println)