pandas dataframe for loop 循环初探

DataFrame直接使用for循环时,按以下顺序获取列名;使用iteritems()方法,可以获取列名称和pandas.Series类型的元组,元组对应每个列的数据;使用iterrows()方法,可以获得每一行的数据(pandas.Series类型)和列名与具体值的元组

pandas dataframe 是表格型的数据结构,它含有一组有序的列,每列可以是不同的值类型。具体的介绍不做细说,直奔中心问题,一个dataframe怎么做循环?

以下面代码实例为具体介绍,从代码中我们可以看到dataframe的具体创建方式之一,而且是指定了index,当然我们可以不指定index而是通过后续的set index来修改index的值,这些操作我们可以通过之前的篇幅来具体了解。

import pandas as pd

df = pd.DataFrame({'age': [24, 42, 18, 60],
                   'location': ['beijing', 'shanghai', 'beijing', 'shenzhen'],
                   'score': [74, 92, 88, 99],
                   'name': ['huahua', 'mingming', 'laowang', 'xiaowang']},
                  index=['1', '2', '3', '4'])
print(df)
   age  location      name  score
1   24   beijing    huahua     74
2   42  shanghai  mingming     92
3   18   beijing   laowang     88
4   60  shenzhen  xiaowang     99

直接对dataframe做for loop 或者 调用__iter__

直接对dataframe做for循环和调用它的__iter__()方法效果是一样的。都是顺序获取列名。

for d in df:
    print(d)
    print("=====")
for d in df.__iter__():
    print(d)
    print("============")
age
=====
location
=====
name
=====
score

DataFrame.iteritems() 逐列检索

使用iteritems()方法,可以获取列名称和pandas.Series类型的元组,元组对应每个列的数据;

for d in df.iteritems():
    print(d)
    print(type(d))
    print("=========")

我们截取部分输出可以看到,iteritems循环得到的是一个个的tupel对象,而tuple第一个值是str类型的对应的列名称;tuple的第二个值是一个pandas.Series类型的对象,报名此列所有行的索引值以及其具体的列值

('age', 1    24
2    42
3    18
4    60
Name: age, dtype: int64)
<class 'tuple'>
=========
('location', 1     beijing
2    shanghai
3     beijing
4    shenzhen
Name: location, dtype: object)
<class 'tuple'>
=========
dataframe iteritems方法循环

DataFrame.iterrows() 逐行检索

使用iterrows()方法,可以获得每一行的数据(pandas.Series类型)和列名与具体值的元组

for d in df.iterrows():
    print(d)
    print(type(d))
    print("=========")

同样我们从部分的输出结果可以看出来,iterrows的效果是和iteritems相对的。iterrows方法返回的依旧是一个tuple对象,而tuple的第一个值是对应的该行所对应的index值,而tuple第二个值依旧是一个pands.Series类型的对象,该Series包含的此行对应所有列的列名及其具体的值。

('1', age              24
location    beijing
name         huahua
score            74
Name: 1, dtype: object)
<class 'tuple'>
=========
('2', age               42
location    shanghai
name        mingming
score             92
Name: 2, dtype: object)
<class 'tuple'>
=========

其实dataframe还有其它的循环方法,具体的内容此篇不去复述了,平时我们处理数据用到比较多的就是以上几种情况。

scala spark dataframe 转置 透视表 行转列 列转行

大数据方向的 分析师 利用scala spark dataframe 进行表的转置,透视表,行专列,列转行,合并列,多行合并等操作

scala spark dataframe 是大数据从业者必须掌握的基础知识点。 对于数据分析人员经常会有对数据表进行转置/透视的需求,还有对数据进行行转列,列转行,多列多行合并的处理。无论哪种情况,对于常年使用SQL语句来进行数据分析的技术人员应该都是信手拈来的操作,但是对于很大使用大数据组件Spark来进行数据处理和分析的选手来说可能就需要了解除了常规SQL之外的操作方式了。

数据样例

val spark = SparkSession
	  .builder()
	  .appName("HelloWorld")
	  .getOrCreate()
	import spark.implicits._

	val data = spark.sparkContext.parallelize(List(
	  ( 1, "张三", 88.0, "Mat"),
	  ( 2, "李四", 67.0, "Mat"),
	  ( 3, "赵六", 77.0, "Mat"),
	  ( 4, "王五", 65.0, "Mat"),
	  ( 1, "张三", 77.0, "Eng"),
	  ( 2, "李四", 90.0, "Eng"),
	  ( 3, "赵六", 24.0, "Eng"),
	  ( 4, "王五", 90.0, "Eng"),
	  ( 1, "张三", 33.0, "Chinese"),
	  ( 2, "李四", 87.0, "Chinese"),
	  ( 3, "赵六", 92.0, "Chinese"),
	  ( 4, "王五", 87.0, "Chinese")
	  ))
	val df = data.toDF("id","name","scores","class_names").cache()
	df.show(false)
	val _cla = df.select("class_names").distinct().rdd.map(_ (0).toString).collect()
	print(_cla.toList)
# dataframe show
+---+----+------+-----------+
|id |name|scores|class_names|
+---+----+------+-----------+
|1  |张三  |88.0  |Mat        |
|2  |李四  |67.0  |Mat        |
|3  |赵六  |77.0  |Mat        |
|4  |王五  |65.0  |Mat        |
|1  |张三  |77.0  |Eng        |
|2  |李四  |90.0  |Eng        |
|3  |赵六  |24.0  |Eng        |
|4  |王五  |90.0  |Eng        |
|1  |张三  |33.0  |Chinese    |
|2  |李四  |87.0  |Chinese    |
|3  |赵六  |92.0  |Chinese    |
|4  |王五  |87.0  |Chinese    |
+---+----+------+-----------+
# 三门课程
List(Eng, Chinese, Mat)

转置/透视表 pivot unpivot 一行转多列

以下代码是通过dataframe的pivot方法,按照名字分组,分组后去第一个分数;最终实现转置表或者是透视表的效果。

	val _pivot = df.groupBy("name").pivot("class_names", _cla).agg(functions.first("scores"))
	_pivot.show(false)
+----+----+-------+----+
|name|Eng |Chinese|Mat |
+----+----+-------+----+
|王五  |90.0|87.0   |65.0|
|李四  |90.0|87.0   |67.0|
|赵六  |24.0|92.0   |77.0|
|张三  |77.0|33.0   |88.0|
+----+----+-------+----+

而如果想要把透视表逆转回去就要用hive 的stack函数来实现,而spark dataframe中没有相关的方法,具体实现如下两种实现方式的代码;但是有两个知识点要注意下

  • 数字一定要能被后边参数名称去重后的个数整出, 如代码中’Analytics’, Mat, ‘BI’, Eng, ‘Ingestion’, Chinese ,去重后肯定能整除3;
  • stack从第二个参数开始两两一组,’Analytics’, 和 Mat, 分别是变成行之后的具体数值 ,以及之前的字段名是什么,读者可以从下边的show详情中看到。
  • 如果字段名称有中文,要使用反引号**`** 把字段包起来;如”stack(2, ‘数学’, `数学`,’英语’,`英语`), 第一个数学是单引号,第二个是反引号,第一个是要最终展示的文本,第二个是字段名
_pivot.select($"Name", 
unctions.expr("stack(3, 'Analytics', Mat, 'BI', Eng, 'Ingestion', Chinese) as (kecheng, fenshu)")).show(false)
# 相同效果的实现方式
//	_pivot.selectExpr("name","stack(3, 'Analytics', Mat, 'BI', Eng, 'Ingestion', Chinese) as (kecheng, fenshu)").show(false)
+----+---------+------+
|Name|kecheng  |fenshu|
+----+---------+------+
|王五  |Analytics|65.0  |
|王五  |BI       |90.0  |
|王五  |Ingestion|87.0  |
|李四  |Analytics|67.0  |
|李四  |BI       |90.0  |
|李四  |Ingestion|87.0  |
|赵六  |Analytics|77.0  |
|赵六  |BI       |24.0  |
|赵六  |Ingestion|92.0  |
|张三  |Analytics|88.0  |
|张三  |BI       |77.0  |
|张三  |Ingestion|33.0  |
+----+---------+------+

行专列 多行合并成一列

df.groupBy("name").agg(functions.concat_ws(",",functions.collect_set($"scores".cast("string"))).as("fenshu")).show(false)
+----+--------------+
|name|fenshu        |
+----+--------------+
|王五  |90.0,65.0,87.0|
|李四  |67.0,90.0,87.0|
|赵六  |77.0,92.0,24.0|
|张三  |88.0,77.0,33.0|
+----+--------------+

列转行 一列转多行

通过functions的split 方法把字符串类型的列先转换成Array类型,之后在通过explode方法把Array转换成多行

import org.apache.spark.sql.functions
val res = df.groupBy("name").agg(functions.concat_ws(",",functions.collect_set($"scores".cast("string"))).as("fenshu"))
	res.show(false)
	res.select($"name", functions.explode(functions.split($"fenshu", ",")).as("score")).show(false)
# 多个数值在一列中
+----+--------------+
|name|fenshu        |
+----+--------------+
|王五  |90.0,65.0,87.0|
|李四  |67.0,90.0,87.0|
|赵六  |77.0,92.0,24.0|
|张三  |88.0,77.0,33.0|
+----+--------------+
# 一列数据拆分成多行
+----+-----+
|name|score|
+----+-----+
|王五  |90.0 |
|王五  |65.0 |
|王五  |87.0 |
|李四  |67.0 |
|李四  |90.0 |
|李四  |87.0 |
|赵六  |77.0 |
|赵六  |92.0 |
|赵六  |24.0 |
|张三  |88.0 |
|张三  |77.0 |
|张三  |33.0 |
+----+-----+

spark dataframe select 源码 对源码的小小窥探

spark dataframe 和 dataset 都是spark的数据集合抽象,而dataframe和dataset的关系可以用一行代码来说明,dataframe就是dataset泛化成Row的一种特例。当然本篇的重点不是为了讨论这两个数据集,而是通过对dataframe的select方法源码阅读来给读者提供一个队源码轻读的思路。

DataFrame = Dataset[Row]

dataframe的 select是一个Transformation,这个也不是我们的讨论重点,加上这句话纯粹为本站内部引流

spark dataframe select is a transformation

源码的版本gradle dependencies

dependencies {
    compile 'org.scala-lang:scala-library:2.11.8'
    compileOnly 'org.apache.spark:spark-core_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-sql_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-mllib_2.11:2.3.0'
    compileOnly 'org.apache.spark:spark-streaming_2.11:2.3.0'
}

源码内容

  /**
   * Selects a set of column based expressions.
   * {{{
   *   ds.select($"colA", $"colB" + 1)
   * }}}
   *
   * @group untypedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def select(cols: Column*): DataFrame = withPlan {
    Project(cols.map(_.named), planWithBarrier)
  }

  /**
   * Selects a set of columns. This is a variant of `select` that can only select
   * existing columns using column names (i.e. cannot construct expressions).
   *
   * {{{
   *   // The following two are equivalent:
   *   ds.select("colA", "colB")
   *   ds.select($"colA", $"colB")
   * }}}
   *
   * @group untypedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

  /**
   * :: Experimental ::
   * Returns a new Dataset by computing the given [[Column]] expression for each element.
   *
   * {{{
   *   val ds = Seq(1, 2, 3).toDS()
   *   val newDS = ds.select(expr("value + 1").as[Int])
   * }}}
   *
   * @group typedrel
   * @since 1.6.0
   */
  @Experimental
  @InterfaceStability.Evolving
  def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = {
    implicit val encoder = c1.encoder
    val project = Project(c1.withInputType(exprEnc, planWithBarrier.output).named :: Nil,
      planWithBarrier)

    if (encoder.flat) {
      new Dataset[U1](sparkSession, project, encoder)
    } else {
      // Flattens inner fields of U1
      new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder)).map(_._1)
    }
  }

scala.annotation.varargs

@scala.annotation.varargs — Var-Args函数(方法)意味着采用可变数量的参数。

Var-Args Functions(Methods) means taking Variable Number of Arguments. As a Java Developer, We have already tasted usage of Var-Args methods in Java-based Applications.

Scala also follows similar kind of syntax to define Var-Args parameter. But it uses * to define Var-Args parameter.–>Scala还遵循类似的语法来定义Var-Args参数。 但是它使用*定义Var-Args参数。

scala 可变参数

annotation 注解

advanced usages: 高级用法; experimental :实验性
  1. 这个包的作用在包的注释中已经做了最好的说明,标记一些api是实验性 或者 声明一些高级用法.
  2. 它主要利用java.lang.annotation来声明了一些自身的注解类或者接口
  3. public class InterfaceStability, 其中有三个接口分别是:
    (a). public @interface Stable {}; //Stable APIs
    (b). public @interface Evolving {}; //APIs that are meant to evolve towards becoming stable APIs, but are not stable APIs yet
    (c). public @interface Unstable {}; // Unstable APIs, with no guarantee on stability.

强弱类型

@group untypedrel 弱类型转换 | @group typedrel 强类型转换

在Saprk的结构化API中,可以分成两类,“无类型(untyped)”的DataFrame API和“类型化(typed)”的Dataset API.
确切的说Dataframe并不是”无类型”的, 它们有类型,只是类型检查没有那么严格,只检查这些类型是否在 运行时(run-time) 与schema中指定的类型对齐。
Dataset在 编译时(compile-time) 就会检查类型是否符合规范。
Dataset API仅适用于 基于JVM的语言(Scala和Java)。我们可以使用Scala 中的case class或Java bean来进行类型指定。

简单来说,如果转换是弱类型的,它将返回一个Dataframe(确切的说弱类型转换的返回类型还有 Column, RelationalGroupedDataset, DataFrameNaFunctions 和 DataFrameStatFunctions 等),而强类型转换返回的是一个Dataset。

星号的作用

select(cols: Column*) 参数带星号,在调用方式可以采用 冒号下划线星的方式把列表整个传入到方法中,如

select(cols_list.head, cols_list.tail:_*)

泛型类

[U1] : 泛型类, 是scala 语法的 泛化, 类似于java 的尖括号 <>;而UML建模语言中,是一种继承关系,表示一般与特殊的关系,它指定了子类如何特化父类的所有特征和行为。

TypedColumn

TypedColumn  是spark.sql.Column 的 伴生类。TypedColumn is a Column for the types of the input and the output. 也就是我们需要去spark.sql.Column中去看 TypedColumn  的具体实现,而我们在这个方法中就可以看到withInputType是它的内部方法。而它具体做了什么?有什么功能不在此处详细细述。