scala spark dataframe 是大数据从业者必须掌握的基础知识点。 对于数据分析人员经常会有对数据表进行转置/透视的需求,还有对数据进行行转列,列转行,多列多行合并的处理。无论哪种情况,对于常年使用SQL语句来进行数据分析的技术人员应该都是信手拈来的操作,但是对于很大使用大数据组件Spark来进行数据处理和分析的选手来说可能就需要了解除了常规SQL之外的操作方式了。
数据样例
val spark = SparkSession
.builder()
.appName("HelloWorld")
.getOrCreate()
import spark.implicits._
val data = spark.sparkContext.parallelize(List(
( 1, "张三", 88.0, "Mat"),
( 2, "李四", 67.0, "Mat"),
( 3, "赵六", 77.0, "Mat"),
( 4, "王五", 65.0, "Mat"),
( 1, "张三", 77.0, "Eng"),
( 2, "李四", 90.0, "Eng"),
( 3, "赵六", 24.0, "Eng"),
( 4, "王五", 90.0, "Eng"),
( 1, "张三", 33.0, "Chinese"),
( 2, "李四", 87.0, "Chinese"),
( 3, "赵六", 92.0, "Chinese"),
( 4, "王五", 87.0, "Chinese")
))
val df = data.toDF("id","name","scores","class_names").cache()
df.show(false)
val _cla = df.select("class_names").distinct().rdd.map(_ (0).toString).collect()
print(_cla.toList)
# dataframe show
+---+----+------+-----------+
|id |name|scores|class_names|
+---+----+------+-----------+
|1 |张三 |88.0 |Mat |
|2 |李四 |67.0 |Mat |
|3 |赵六 |77.0 |Mat |
|4 |王五 |65.0 |Mat |
|1 |张三 |77.0 |Eng |
|2 |李四 |90.0 |Eng |
|3 |赵六 |24.0 |Eng |
|4 |王五 |90.0 |Eng |
|1 |张三 |33.0 |Chinese |
|2 |李四 |87.0 |Chinese |
|3 |赵六 |92.0 |Chinese |
|4 |王五 |87.0 |Chinese |
+---+----+------+-----------+
# 三门课程
List(Eng, Chinese, Mat)
转置/透视表 pivot unpivot 一行转多列
以下代码是通过dataframe的pivot方法,按照名字分组,分组后去第一个分数;最终实现转置表或者是透视表的效果。
val _pivot = df.groupBy("name").pivot("class_names", _cla).agg(functions.first("scores"))
_pivot.show(false)
+----+----+-------+----+
|name|Eng |Chinese|Mat |
+----+----+-------+----+
|王五 |90.0|87.0 |65.0|
|李四 |90.0|87.0 |67.0|
|赵六 |24.0|92.0 |77.0|
|张三 |77.0|33.0 |88.0|
+----+----+-------+----+
而如果想要把透视表逆转回去就要用hive 的stack函数来实现,而spark dataframe中没有相关的方法,具体实现如下两种实现方式的代码;但是有两个知识点要注意下
- 数字一定要能被后边参数名称去重后的个数整出, 如代码中’Analytics’, Mat, ‘BI’, Eng, ‘Ingestion’, Chinese ,去重后肯定能整除3;
- stack从第二个参数开始两两一组,’Analytics’, 和 Mat, 分别是变成行之后的具体数值 ,以及之前的字段名是什么,读者可以从下边的show详情中看到。
- 如果字段名称有中文,要使用反引号**`** 把字段包起来;如”stack(2, ‘数学’, `数学`,’英语’,`英语`), 第一个数学是单引号,第二个是反引号,第一个是要最终展示的文本,第二个是字段名
_pivot.select($"Name",
unctions.expr("stack(3, 'Analytics', Mat, 'BI', Eng, 'Ingestion', Chinese) as (kecheng, fenshu)")).show(false)
# 相同效果的实现方式
// _pivot.selectExpr("name","stack(3, 'Analytics', Mat, 'BI', Eng, 'Ingestion', Chinese) as (kecheng, fenshu)").show(false)
+----+---------+------+
|Name|kecheng |fenshu|
+----+---------+------+
|王五 |Analytics|65.0 |
|王五 |BI |90.0 |
|王五 |Ingestion|87.0 |
|李四 |Analytics|67.0 |
|李四 |BI |90.0 |
|李四 |Ingestion|87.0 |
|赵六 |Analytics|77.0 |
|赵六 |BI |24.0 |
|赵六 |Ingestion|92.0 |
|张三 |Analytics|88.0 |
|张三 |BI |77.0 |
|张三 |Ingestion|33.0 |
+----+---------+------+
行专列 多行合并成一列
df.groupBy("name").agg(functions.concat_ws(",",functions.collect_set($"scores".cast("string"))).as("fenshu")).show(false)
+----+--------------+
|name|fenshu |
+----+--------------+
|王五 |90.0,65.0,87.0|
|李四 |67.0,90.0,87.0|
|赵六 |77.0,92.0,24.0|
|张三 |88.0,77.0,33.0|
+----+--------------+
列转行 一列转多行
通过functions的split 方法把字符串类型的列先转换成Array类型,之后在通过explode方法把Array转换成多行
import org.apache.spark.sql.functions
val res = df.groupBy("name").agg(functions.concat_ws(",",functions.collect_set($"scores".cast("string"))).as("fenshu"))
res.show(false)
res.select($"name", functions.explode(functions.split($"fenshu", ",")).as("score")).show(false)
# 多个数值在一列中
+----+--------------+
|name|fenshu |
+----+--------------+
|王五 |90.0,65.0,87.0|
|李四 |67.0,90.0,87.0|
|赵六 |77.0,92.0,24.0|
|张三 |88.0,77.0,33.0|
+----+--------------+
# 一列数据拆分成多行
+----+-----+
|name|score|
+----+-----+
|王五 |90.0 |
|王五 |65.0 |
|王五 |87.0 |
|李四 |67.0 |
|李四 |90.0 |
|李四 |87.0 |
|赵六 |77.0 |
|赵六 |92.0 |
|赵六 |24.0 |
|张三 |88.0 |
|张三 |77.0 |
|张三 |33.0 |
+----+-----+