scala spark dataframe 是大数据从业者必须掌握的基础知识点。 对于数据分析人员经常会有对数据表进行转置/透视的需求,还有对数据进行行转列,列转行,多列多行合并的处理。无论哪种情况,对于常年使用SQL语句来进行数据分析的技术人员应该都是信手拈来的操作,但是对于很大使用大数据组件Spark来进行数据处理和分析的选手来说可能就需要了解除了常规SQL之外的操作方式了。
数据样例
val spark = SparkSession .builder() .appName("HelloWorld") .getOrCreate() import spark.implicits._ val data = spark.sparkContext.parallelize(List( ( 1, "张三", 88.0, "Mat"), ( 2, "李四", 67.0, "Mat"), ( 3, "赵六", 77.0, "Mat"), ( 4, "王五", 65.0, "Mat"), ( 1, "张三", 77.0, "Eng"), ( 2, "李四", 90.0, "Eng"), ( 3, "赵六", 24.0, "Eng"), ( 4, "王五", 90.0, "Eng"), ( 1, "张三", 33.0, "Chinese"), ( 2, "李四", 87.0, "Chinese"), ( 3, "赵六", 92.0, "Chinese"), ( 4, "王五", 87.0, "Chinese") )) val df = data.toDF("id","name","scores","class_names").cache() df.show(false) val _cla = df.select("class_names").distinct().rdd.map(_ (0).toString).collect() print(_cla.toList)
# dataframe show +---+----+------+-----------+ |id |name|scores|class_names| +---+----+------+-----------+ |1 |张三 |88.0 |Mat | |2 |李四 |67.0 |Mat | |3 |赵六 |77.0 |Mat | |4 |王五 |65.0 |Mat | |1 |张三 |77.0 |Eng | |2 |李四 |90.0 |Eng | |3 |赵六 |24.0 |Eng | |4 |王五 |90.0 |Eng | |1 |张三 |33.0 |Chinese | |2 |李四 |87.0 |Chinese | |3 |赵六 |92.0 |Chinese | |4 |王五 |87.0 |Chinese | +---+----+------+-----------+ # 三门课程 List(Eng, Chinese, Mat)
转置/透视表 pivot unpivot 一行转多列
以下代码是通过dataframe的pivot方法,按照名字分组,分组后去第一个分数;最终实现转置表或者是透视表的效果。
val _pivot = df.groupBy("name").pivot("class_names", _cla).agg(functions.first("scores")) _pivot.show(false)
+----+----+-------+----+ |name|Eng |Chinese|Mat | +----+----+-------+----+ |王五 |90.0|87.0 |65.0| |李四 |90.0|87.0 |67.0| |赵六 |24.0|92.0 |77.0| |张三 |77.0|33.0 |88.0| +----+----+-------+----+
而如果想要把透视表逆转回去就要用hive 的stack函数来实现,而spark dataframe中没有相关的方法,具体实现如下两种实现方式的代码;但是有两个知识点要注意下
- 数字一定要能被后边参数名称去重后的个数整出, 如代码中’Analytics’, Mat, ‘BI’, Eng, ‘Ingestion’, Chinese ,去重后肯定能整除3;
- stack从第二个参数开始两两一组,’Analytics’, 和 Mat, 分别是变成行之后的具体数值 ,以及之前的字段名是什么,读者可以从下边的show详情中看到。
- 如果字段名称有中文,要使用反引号**`** 把字段包起来;如”stack(2, ‘数学’, `数学`,’英语’,`英语`), 第一个数学是单引号,第二个是反引号,第一个是要最终展示的文本,第二个是字段名
_pivot.select($"Name", unctions.expr("stack(3, 'Analytics', Mat, 'BI', Eng, 'Ingestion', Chinese) as (kecheng, fenshu)")).show(false) # 相同效果的实现方式 // _pivot.selectExpr("name","stack(3, 'Analytics', Mat, 'BI', Eng, 'Ingestion', Chinese) as (kecheng, fenshu)").show(false)
+----+---------+------+ |Name|kecheng |fenshu| +----+---------+------+ |王五 |Analytics|65.0 | |王五 |BI |90.0 | |王五 |Ingestion|87.0 | |李四 |Analytics|67.0 | |李四 |BI |90.0 | |李四 |Ingestion|87.0 | |赵六 |Analytics|77.0 | |赵六 |BI |24.0 | |赵六 |Ingestion|92.0 | |张三 |Analytics|88.0 | |张三 |BI |77.0 | |张三 |Ingestion|33.0 | +----+---------+------+
行专列 多行合并成一列
df.groupBy("name").agg(functions.concat_ws(",",functions.collect_set($"scores".cast("string"))).as("fenshu")).show(false)
+----+--------------+ |name|fenshu | +----+--------------+ |王五 |90.0,65.0,87.0| |李四 |67.0,90.0,87.0| |赵六 |77.0,92.0,24.0| |张三 |88.0,77.0,33.0| +----+--------------+
列转行 一列转多行
通过functions的split 方法把字符串类型的列先转换成Array类型,之后在通过explode方法把Array转换成多行
import org.apache.spark.sql.functions val res = df.groupBy("name").agg(functions.concat_ws(",",functions.collect_set($"scores".cast("string"))).as("fenshu")) res.show(false) res.select($"name", functions.explode(functions.split($"fenshu", ",")).as("score")).show(false)
# 多个数值在一列中 +----+--------------+ |name|fenshu | +----+--------------+ |王五 |90.0,65.0,87.0| |李四 |67.0,90.0,87.0| |赵六 |77.0,92.0,24.0| |张三 |88.0,77.0,33.0| +----+--------------+
# 一列数据拆分成多行 +----+-----+ |name|score| +----+-----+ |王五 |90.0 | |王五 |65.0 | |王五 |87.0 | |李四 |67.0 | |李四 |90.0 | |李四 |87.0 | |赵六 |77.0 | |赵六 |92.0 | |赵六 |24.0 | |张三 |88.0 | |张三 |77.0 | |张三 |33.0 | +----+-----+