Spark DF/DS API:行列转换

前文回顾:Hive:行列转换

行转列

多行转多列

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val rows1 = new util.ArrayList[Row]()
rows1.add(Row("a", "c", 1))
rows1.add(Row("a", "d", 2))

rows1.add(Row("b", "d", 5))
rows1.add(Row("b", "e", 6))

val schema1 = StructType(
StructField("col1", StringType)::
StructField("col2", StringType)::
StructField("col3", IntegerType)::Nil
)
val df1 = spark.createDataFrame(rows1, schema1)

println("多行转多列")
df1.show()

df1.groupBy('col1)
.pivot('col2)
.max("col3").na.fill(0)
.show()

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
多行转多列
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| c| 1|
| a| d| 2|
| b| d| 5|
| b| e| 6|
+----+----+----+

+----+---+---+---+
|col1| c| d| e|
+----+---+---+---+
| b| 0| 5| 6|
| a| 1| 2| 0|
+----+---+---+---+

多行转单列

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val rows2 = new util.ArrayList[Row]()
rows2.add(Row("a", "b", 1))
rows2.add(Row("a", "b", 2))
rows2.add(Row("a", "b", 3))
rows2.add(Row("b", "d", 4))
rows2.add(Row("b", "d", 5))
rows2.add(Row("b", "d", 6))

val schema2 = StructType(
StructField("col1", StringType)::
StructField("col2", StringType)::
StructField("col3", IntegerType)::Nil
)
val df2 = spark.createDataFrame(rows2, schema2)
println("多行转单列")
df2.show()
df2.groupBy('col1,'col2)
.agg(concat_ws(",",collect_set('col3)).as("col3"))
.show()

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
多行转单列
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| b| 1|
| a| b| 2|
| a| b| 3|
| b| d| 4|
| b| d| 5|
| b| d| 6|
+----+----+----+

+----+----+-----+
|col1|col2| col3|
+----+----+-----+
| a| b|1,2,3|
| b| d|5,6,4|
+----+----+-----+

列转行

多列转多行

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val rows3 = new util.ArrayList[Row]()
rows3.add(Row("a", 1, 2, 3))
rows3.add(Row("b", 4, 5, 6))

val schema3 = StructType(
StructField("col1", StringType)::
StructField("c", IntegerType)::
StructField("d", IntegerType)::
StructField("e", IntegerType)::Nil
)
val df3 = spark.createDataFrame(rows3, schema3)

println("多列转多行")
df3.show()
df3.selectExpr("col1","stack(3, 'c', c, 'd', d, 'e', e) as (`col2`,`col3`)")
.show()

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
多列转多行
+----+---+---+---+
|col1| c| d| e|
+----+---+---+---+
| a| 1| 2| 3|
| b| 4| 5| 6|
+----+---+---+---+

+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| c| 1|
| a| d| 2|
| a| e| 3|
| b| c| 4|
| b| d| 5|
| b| e| 6|
+----+----+----+

单列转多行

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val rows4 = new util.ArrayList[Row]()
rows4.add(Row("a", "b", "1,2,3"))
rows4.add(Row("c", "d", "4,5,6"))

val schema4 = StructType(
StructField("col1", StringType)::
StructField("col2", StringType)::
StructField("col3", StringType):: Nil
)
val df4 = spark.createDataFrame(rows4, schema4)

println("单列转多行")
df4.show()
df4.select('col1,'col2,explode(split('col3,",")).as("col3")).show()

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
单列转多行
+----+----+-----+
|col1|col2| col3|
+----+----+-----+
| a| b|1,2,3|
| c| d|4,5,6|
+----+----+-----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| b| 1|
| a| b| 2|
| a| b| 3|
| c| d| 4|
| c| d| 5|
| c| d| 6|
+----+----+----+