python 爬虫 闲话 基础入门

说在前面的话

  • python在自身历史版本中主要是python2和python3。我们不去表述各种的python实现,用C(CPython)\C++(Pyston)\Java(Jython)\C#(IronPython)\Ruby\JS都有, 我们只去说一下基于python3的最基础的使用和应用。当然很多的教程是以python2来写的,如果我们遇到其实并不影响我们去学习。
  • 浏览器主要是Chrome

一钱思路

  1. 拿到http://**.com/path/to/page 的地址, 将地址在浏览器(Chrome)中打开
  2. 在浏览器加载完地址且能看到你想看到的页面后,按F12 或者 直接在页面 右键=> 点击检查 ,页面会在某一个侧(默认是页面的右侧或者下侧)弹出如下图的 开发者工具, 图中的标红的箭头、 Elements、Network很关键,几乎是囊括了web爬虫所需的所有前置条件。
Chrome开发者工具
  1. 一般是箭头和Elements组合来使用,使用的目的是什么?就是确定你要爬取的数据在页面的什么地方。 步骤是:先点击Elements,它的紧下侧会有强调横线出现,同时整体下方出现的是html源码页面,看到<!DOCTYPE html> 就是了; 然后点击最左侧的箭头,箭头的颜色会变成蓝色或者颜色加重,随后把鼠标慢慢挪到真正的页面内容上,就会发现不一样了,这个时候html 源码页面的光标会随着你的鼠标在实际的可见页面移动而移动。而且点击一下实际可见页面的随意内容,html源码的光标就会立刻停在源码的位置, 实际页面不会发生变化。
  2. Elements + 箭头的 页面定位操作, 每一次定位 箭头都要从新点击一次。
  3. 对于Network点击后看到的是页面加载时的所有页面接口,如果说Elements + 箭头对应的静态页面内容的话,Network对应的就是动态加载的数据;明确哪些数据是有哪个接口提供的以及这个接口需要什么参数、cookie等关键性的请求依据都是从Network获取的
  4. 如果在浏览器中输入的地址没有得到具体的页面,而是一个登陆页面,此时就需要箭头+Elements+Network一起上了, 先用箭头+Elements明确登陆页面的需要输入的元素在哪里,在葱Network中确定登陆接口;如果登陆需要验证码还要从第三方的接口或者验证码的破解来辅助登陆。
  5. 以上都是基于浏览器可以直接看到数据的情况,如果页面是纯粹的动态加载,那么就要涉及到对页面js的逆向工程及更深层次的技术,此处不做深入谈论; 而如何确定页面是不是动态加载的呢?这个可以在Elements中找到答案, 去Elements的html页面查询真实页面的数据项,如果查不到基本上可以确定数据项是动态加载的, 这个时候需要去Network中找跟我们地址url对应到接口来确定页面数据是由哪个接口来动态提供的。如果只是简单需求可以用Selenium解决这个问题

二两概念

  1. request 和 response是什么?

request :可以通俗理解成对页面发起网络请求response: 可以通俗理解成对网页请求后,响应返回的数据request和response在很多类或者方法定义的主要关键字,又或者是约定成俗的写法; 在爬虫任务进行时的信息数据的两个不同的流向代表。

  1. 能抓取怎样的数据?

网页文本、图片、视频、二进制流

  1. 解析方式有哪些?

直接处理、json解析、正则表达式、beautifulsoup、pyquery 、 Xpath

  1. 怎样解决js渲染的问题?

分析Ajax请求、用selenium / webdriver、Splash、Pyv8

三分技术

  1. requests (请求页面)
  1. xpath (解析页面)
  • xpath是一门在 XML 文档中查找信息的语言,说的通俗点就是我们爬取的网页是一种有着基本款式和样式的本文信息, xpath就可以解析这些信息,从而取到我们想要的内容。
  • xpath是一种技术,在python中常用的库是 lxml
  • 为啥要用xpath先在页面搞一下? 因为我们不知道页面是什么样的,那么爬取到数据后我们也没有办法知道取哪里的信息为我所用,所有要运行所有的爬取程序之前先用浏览器目测一下具体哪些内容可以怎么接下下来。
  • xpath入门连接: https://www.runoob.com/xpath/xpath-tutorial.html
  • 实际使用中, 以chrome浏览器为例,chrome浏览器是有自己的插件的,这个插件叫做XPath Helper, 它的作用是直接从页面可以快速的获取页面中我们想要的内容的xpath路径。如果是其它浏览器的基本操作前边也都是一样的,只是需要人工来找到想要内容的xpath路径,这样就需要爬虫代码的编写者非常熟悉xpath语法了。
  • 无论是否是chrome浏览器,在看一个页面的元素xpath路径时都是有一个基本的步骤的
1、用浏览器(IE 和 chrome为例)打开想要爬取的页面地址
2、页面上任何位置 右键 --> 检查 (检查元素)  或者直接按 F12 (不同浏览器说辞可能不一样,但是含义是相同的),会看到浏览器的下方或者右侧弹出如下两张图的界面,最左侧的箭头+方块的图标选中后就可以在页面选择元素了,也可以直接在弹出的界面中的页面源码中选择元素
import requests
from lxml import etree
if __name__ == __main__:
    headers = {
        User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36
    }
    url = https://anqiu.58.com/ershoufang/
    page_text = requests.get(url = url,headers = headers).text
    tree = etree.HTML(page_text)
    li_List = tree.xpath(//section【@class = list】/div)
    fp = open(58.txt,w,encoding=utf-8)
    for li in li_List:
        title = li.xpath(./a/div【2】//div/h3/text())【0】
        print(title)
        fp.write(title+\n)
  1. 正则表达式 (解析页面)
    1. 通过网页请求requests返回的数据取得text文本内容
    2. 通过python的re 正则库,可以对文本内容进行正则匹配从而解析到想要的数据内容
  1. BeautifulSoup4 (解析页面)
  • BeautifulSoup也是python的一个库,类似于xpath同样是对页面进行解析,实现原理有所不同,它是基于DOM的,解析后可以按照DOM树对页面进行提取数据,懂得css等前端技术的人用起来会更顺手,且相对于xpath要API非常人性化,支持css选择器
  • 灵活性和性能来说bs4没有xpath好用
  • 文档传送门: https://beautifulsoup.readthedocs.io/zh_CN/v4.4.0/
  1. Selenium: 它本身是Web应用程序测试的工具,直接在浏览器中运行, 所以它可以做到像浏览器一样把数据全部都加载出来,这样一来哪怕是动态加载也是可以拿到加载后的数据的。它本身是专门针对测试的工具 ,所有还是测重测试方向,官网传送门: https://www.selenium.dev/;而python调用selenium的API文档可以在这里看到 https://selenium-python.readthedocs.io/ 网站中有详细的说明以及对Drivers的下载链接地址。
  2. Scrapy 分布式爬虫框架,

主要步骤: 创建工程、爬虫文件、执行

  • scrapy startproject xxxPro
  • 在项目目录下输入: scrapy genspider spiderName www.XXX.com 创建爬虫文件
  • 框架会自动创建所有需要的items pipline settings.py 中间件等默认文件,而开发者只需要去根据需求去修改对应的文件来完成整个爬虫任务的构建即可
  • 在项目目录下执行: scrapy crawl spiderName

LinkedInAttic ScANNS 近似邻近搜索 Spark LSH 替换方案

LinkedInAttic ScANNS 是近似邻近搜索的有效方案,对于使用spark LSH模型遇到的一些瓶颈有着天然高校的替换效果。笔者在以前的文章中介绍过Spark LSH的基本情况和主要的缺点,同时对于余弦相似的手动编写代码来实现近似效果的案例我们也介绍过。

不管哪一种情况,在于实际项目中的模型计算效果、spark集群的资源利用率、时间上的消耗等都是差强人意的,尤其是笔者自己的项目某次不稳定的模型运行导致集群的大量资源占用效果,对于实际工程的影响很大。

笔者痛定思痛,把Spark LSH的相似计算过程用 LinkedInAttic ScANNS 替换,以此做基本的记录。

spark 码农

1、原Spark LSH模型计算

  #val string_to_vector = functions.udf((x: String) =>{
  #	Vectors.dense(x.split(",").map(x => x.toDouble))
  #})

val w2vDf = spark.sql(s"select site_id, article_id, vectors from ${word2vec_tb}").
	  withColumn("wordvec", string_to_vector($"vectors")).persist(StorageLevel.MEMORY_AND_DISK_SER)
	val brp = new BucketedRandomProjectionLSH()
	  .setBucketLength(bucket_number)
	  .setNumHashTables(hash_number).
	  setSeed(seedNum)
	  .setInputCol("wordvec")
	  .setOutputCol("hashes")
	val brpModel = brp.fit(w2vDf)

对于模型的输入数据主要是分词后的数据集,而分词效果在余弦相似度的文章中有提及,(对于直接存储分词结果的数据落盘方案笔者是不提倡的,而是应该将分词后的所有数据全部都数字化,转成自定义词典的ID值来做后续的所有运算。)

2、原相似结果计算

# 加载LSH模型计算结果
val df = spark.sql(s"select site_id,article_id, vectors, hash_values from ${brpModeltb}").
	  withColumn("hashes", StringToDenseVector($"hash_values")).
	  withColumn("wordvec", string_to_vector($"vectors")).persist(StorageLevel.MEMORY_AND_DISK_SER)

# 计算JaccardDistance
val brpDf =  itemModel.approxSimilarityJoin(df, df, threshold , "JaccardDistance").persist(StorageLevel.MEMORY_AND_DISK)

log.info(s"save doc similarity result to ${dsimtb}")
# 计算全量相似结果
  brpDf.select($"datasetA".getField("article_id").as("article_id"),
		$"datasetA".getField("site_id").as("site_id"),
		$"datasetB".getField("article_id").as("id"),
		$"datasetB".getField("site_id").as("city_id"),
		$"JaccardDistance").
		filter(row => row.getAs[Long]("article_id") != row.getAs[Long]("id")  &&
		  row.getAs[Long]("site_id") == row.getAs[Long]("city_id")).
		select($"article_id",$"id",$"site_id").
		groupBy("article_id", "site_id").
		agg(functions.concat_ws(",", functions.collect_set($"id".cast("string"))).as("ids")).
		withColumn("ids", TopN_Sims(num)($"ids")).
		filter(row => !row.getAs[String]("ids").equals("")).write.mode("overwrite").saveAsTable(dsimtb)

上述代码是对全量数据做了相似计算并生成结果。其中有一些字段说明一下,site_id : 城市, article_id: 文章id, vectors和hash_values是做了 HashingTF 之后的LSH模型结算结果

3、原增量相似结果计算

由于Spark LSH的全量计算比较耗时且费资源,故做了增量的计算调度过程,但是正是Spark LSH对于鸡肋的增量计算进一步加大了它低效的定位。

# 加载LSH 模型 并广播模型
val itemModel = BucketedRandomProjectionLSHModel.load(brpModelpath)
val _model = spark.sparkContext.broadcast(itemModel)
# 加载增量数据
val new_sql = s"SELECT  site_id, article_id from  ${ODS_ARTICLE_SITE} WHERE  FROM_UNIXTIME(create_time,'yyyy-MM-dd') ='${dt}' AND status=1 AND type=0 AND site_id IN (${city_values}) "

	val articles = spark.sql(new_sql).join(df,Seq("site_id", "article_id"),"left").
	  filter(row => null != row.getAs("vectors")).rdd.map{row =>
	  row.getAs[Long]("site_id").toString + "#" +
		row.getAs[Long]("article_id").toString + "#" +
		row.getAs[String]("vectors")
	}.collect().toList
# 计算增量的相似结果
 articles.map{line =>
		val values = line.split("#")
		val site_id = values(0)
		val article_id = values(1)
		val vecs = values(2).split(",").map(x => x.toDouble)
		//	  val key = Vectors.sparse(values(1).toInt,values(3).split(",").map(x => x.toInt) ,values(3).split(",").map(x => x.toDouble))
		val key = Vectors.dense(vecs)
		val dd = _model.value.approxNearestNeighbors(df, key , (num * 1.5).toInt ).toDF().
		  filter(nrow => !nrow.getAs[Long]("article_id").toString.equals(article_id) &&
			nrow.getAs[Long]("site_id").toString.equals(site_id)).
		  select("article_id").rdd.map(x => x.getAs[Long]("article_id").toString).collect().toList
		(site_id, article_id, dd.mkString(","))
	  }.toDF("site_id","article_id","ids").
		filter(rr => !rr.getAs[String]("ids").equals("")).
		write.mode("overwrite").saveAsTable(dsimtb)

4、LinkedInAttic ScANNS 替换 Spark LSH

# 新的模型
val model = new CosineSignRandomProjectionNNS()
	  .setNumHashes(num_hash)
	  .setSignatureLength(sig_nature_length)
	  .setJoinParallelism(joinParallelism)
	  .setBucketLimit(bucket_limit)
	  .setShouldSampleBuckets(true)
	  .setNumOutputPartitions(num_partitions)
	  .createModel(dimension_len)
# 加载词向量
	val w2v = w2vDf.select("article_id","wordvec").
	  filter(x=> null != x.getAs("article_id") && null != x.getAs("wordvec") &&
		x.getAs("wordvec").asInstanceOf[org.apache.spark.ml.linalg.Vector].numNonzeros >0).
	  rdd.map{row=>
	  (row.getAs[Long]("article_id"), row.getAs("wordvec").asInstanceOf[org.apache.spark.ml.linalg.Vector])
	}.persist(StorageLevel.MEMORY_AND_DISK_SER)
# 生成相似结果
val nbrs = model.getSelfAllNearestNeighbors(w2v, numCandidates).toDF("article_id","oid","score").persist(StorageLevel.MEMORY_AND_DISK_SER)
	nbrs.join(w2vDf, Seq("article_id"), "left").
	  select("article_id","oid","site_id").
	  withColumnRenamed("site_id","city_id").
	  join(ods, Seq("oid"),"left").select("article_id","oid","city_id","site_id").
	  filter(x=> null != x.getAs("city_id") && null != x.getAs("site_id") &&
		x.getAs[Long]("city_id") == x.getAs[Long]("site_id")).select("article_id","oid").
	  groupBy("article_id").agg(functions.concat_ws(",", functions.collect_set($"oid".cast("string"))).as("ids")).
	  join(w2vDf,Seq("article_id"),"left").select("article_id","site_id","ids").
	  withColumn("article_id", $"article_id".cast("long")).withColumn("site_id", $"site_id".cast("long"))
	  .write.mode("overwrite").
	  saveAsTable(dsimtb)

整体的替换还是比较顺畅的,有一点需要主要 LinkedInAttic ScANNS 和scala的版本对应情况,有的版本编译后运行模型会出现问题。笔者在之前的文章中提到过具体的编译和错误处理方案,以及相关的版本号。

scala add elements 集合添加元素

scala add elements to List or Set,集合添加元素是scala预约经常遇到的操作。。笔者只对该两个集合的基本操作做入门级实验,具体应用中的细节不做过多的说明,请读者自行多多验证,多动手才能卷到往生。

默认情况下,scala 的List 和 Set 都是不变的,如果新增元素或者拼接新的集合其实都是生成了新的实例

List集合添加元素的一些操作

    val list1 =List(4,5,6,7,8)
    println(s"list1 = ${list1}")
    // 增加元素
    val list2 = 1::list1.::(9)
    println(s" :: 主动调用和被动添加 都放在集合前; list2 = ${list2}")
    val list3 = 10+:list1 :+ 11
    println(s"+: :+ 添加到集合前 集合后; list3 = ${list3}")
    // list 集合拼接
    val  list4 = list2 ++ list3
    println(s"两个集合拼接成新的集合; list4 = ${list4}")
    // list集合与list集合的元素拼接
    val list5 = list1 ::: list3
    println(s"list1 ::: list3 = ${list5}")
    val shuzu = Array(1, 3, 4) ++ Array(55,33 ,55)
    println(s"::: 只能用在List , 而++还可以用在 Array 数组: ${ shuzu.mkString(",") }")
list1 = List(4, 5, 6, 7, 8)
 :: 主动调用和被动添加 都放在集合前; list2 = List(1, 9, 4, 5, 6, 7, 8)
+: :+ 添加到集合前 集合后; list3 = List(10, 4, 5, 6, 7, 8, 11)
两个集合拼接成新的集合; list4 = List(1, 9, 4, 5, 6, 7, 8, 10, 4, 5, 6, 7, 8, 11)
list1 ::: list3 = List(4, 5, 6, 7, 8, 10, 4, 5, 6, 7, 8, 11)
::: 只能用在List , 而++还可以用在 Array 数组: 1,3,4,55,33,55

我们在以前的文章中有学习到Array的一些入门操作。读者可以移步自赏

Set集合添加元素的操作

scala set
默认set是不可变的
上图的输出结果
Set()
Set(5)

在上图中我们可以看到set1默认是空集合,它不可以添加元素;可能会有人对+号操作有误解, + 号操作返回的是一个新集合,并不是对set1的修改。而且我们定义set1用的是关键字val 那么set1对象实例本身就是不可以修改的。

    import scala.collection.mutable.Set
    val set1 = Set[Int]()
    set1.add(4)
    set1.add(4)
    set1 += 3
    set1.add(5)
    println(s"before add 6 , set1 = ${set1}")
    set1.add(6)
    println(s"after add 6 , set1 = ${set1}")
    set1.remove(4)
    println(s"after remove 4 , set1 = ${set1}")
    set1 ++= Vector(7, 8)
    println(s"after ++=Vector , set1 = ${set1}")
before add 6 , set1 = Set(5, 3, 4)
after add 6 , set1 = Set(5, 6, 3, 4)
after remove 4 , set1 = Set(5, 6, 3)
after ++=Vector , set1 = Set(5, 6, 3, 7, 8)