基本环境
scala版本:scala2.12.8
jdk版本:1.8
spark版本:2.4.1
准备数据
sku_sale_amount.csv1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2010001,36B0,烟机 ,2015,100
10001,27A3,烟机 ,2016,100
10001,27A3,烟机 ,2017,200
10002,36B0,烟机 ,2015,100
10002,36B0,烟机 ,2016,100
10002,36B0,烟机 ,2017,200
10002,58B5,灶具 ,2014,200
10002,58B5,灶具 ,2015,200
10002,58B5,灶具 ,2016,200
10002,58B5,灶具 ,2017,200
10003,64B8,洗碗机 ,2014,200
10003,727T,智能消毒柜,2014,200
10004,64B8,净水器 ,2014,150
10004,64B8,净水器 ,2015,50
10004,64B8,净水器 ,2016,50
10004,45A8,净水器 ,2017,50
10004,64B8,嵌入式蒸箱,2014,150
10004,64B8,嵌入式蒸箱,2015,50
10004,64B8,嵌入式蒸箱,2016,50
10004,45A8,嵌入式蒸箱,2017,50
sku_product.csv1
2
3
4
5
6
7
8
9
10
11
12
1310001,1000
10001,2000
10001,4000
10001,6000
10002,8000
10002,10000
10002,9000
10002,6000
10003,5000
10004,4000
10004,3000
10004,44000
10004,11000
准备基本环境
1 | object DataSetSingleOperating { |
select 操作
1 | skuIncomeDF.select($"goods_id", $"sku", $"category", $"year".as("date"), $"amount") |
selectExpr操作
1 | skuIncomeDF.selectExpr("goods_id", "sku", "category", "year as date", "amount+1 as amount") |
filter 操作
1 | skuIncomeDF.filter($"amount" > 150) |
where操作
1 | skuIncomeDF.where($"amount" > 150) |
union 操作
1 | skuIncomeDF.union(skuIncomeDF).groupBy("sku").count() |
group by操作
1 | skuIncomeDF.groupBy("sku").count() |
join操作
1 | skuIncomeDF.join(skuIncomeDF, "sku") |
order by
底层用的还是sort。1
2
3skuIncomeDF.orderBy("sku", "category", "amount")
skuIncomeDF.orderBy($"sku", $"category", $"amount".desc)
skuIncomeDF.orderBy(col("sku"), col("category"), col("amount").desc)
sort 操作
1 | skuIncomeDF.sort("sku", "category", "amount") |
sortwithinpartition操作
分区内部进行排序,局部排序。1
2
3skuIncomeDF.sortWithinPartitions("sku", "category")
skuIncomeDF.sortWithinPartitions($"sku", $"category", $"amount".desc)
skuIncomeDF.sortWithinPartitions(col("sku"), col("category").desc)
withColumn 操作
作用:假如列,存在就替换,不存在新增;对已有的列进行重命名。1
2
3skuIncomeDF.withColumn("amount", $"amount" + 1)
skuIncomeDF.withColumn("amount", 'amount + 1)
skuIncomeDF.withColumnRenamed("amount", "amount1")
foreach操作
1 | skuIncomeDF.foreach(row => println(row.get(0))) |
foreachPartition操作
1 | skuIncomeDF.foreachPartition((it: Iterator[Row]) => { |
distinct操作
1 | skuIncomeDF.distinct() |
dropDuplicates操作
1 | skuIncomeDF.dropDuplicates("sku") |
drop操作
删除一列,或者多列。1
skuIncomeDF.drop("sku", "category")
cube操作
说明:相当于(category,year),(year),(category),() 分别分组然后对amount求sum
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-multi-dimensional-aggregation.html
1 | skuIncomeDF.cube($"category", $"year".cast("string").as("year")) |
rollup操作
说明:等价于分别对(category,year),(year),()进行 groupby 对amount求sum。
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-multi-dimensional-aggregation.html1
2
3skuIncomeDF.rollup($"category", $"year".cast("string").as("year"))
.agg(sum("amount") as "amount", grouping_id() as "gid")
.sort($"category".desc_nulls_last, $"year".asc_nulls_last)
pivot操作
1 | skuIncomeDF.groupBy("category") |
转置操作
文章来自:http://bailiwick.io/2017/10/21/transpose-data-with-spark/1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71// Import the requisite methods
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}
// Create a dataframe
val df = spark.createDataFrame(Seq(
(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 7, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 8, 7, 3),
(6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", "col6")
df.show(10,false)
// Create the transpose user defined function.
// Imputs:
// transDF: The dataframe which will be transposed
// transBy: The column that the dataframe will be transposed by
// Outputs:
// Dataframe datatype consisting of three columns:
// transBy
// column_name
// column_value
def transposeUDF(transDF: DataFrame, transBy: Seq[String]): DataFrame = {
val (cols, types) = transDF.dtypes.filter{ case (c, _) => !transBy.contains(c)}.unzip
require(types.distinct.size == 1)
val kvs = explode(array(
cols.map(c => struct(lit(c).alias("column_name"), col(c).alias("column_value"))): _*
))
val byExprs = transBy.map(col(_))
transDF
.select(byExprs :+ kvs.alias("_kvs"): _*)
.select(byExprs ++ Seq($"_kvs.column_name", $"_kvs.column_value"): _*)
}
transposeUDF(df, Seq("uid")).show(12,false)
Output:
df.show(10,false)
+---+----+----+----+----+----+----+
|uid|col1|col2|col3|col4|col5|col6|
+---+----+----+----+----+----+----+
|1 |1 |2 |3 |8 |4 |5 |
|2 |4 |3 |8 |7 |9 |8 |
|3 |6 |1 |9 |2 |3 |6 |
|4 |7 |8 |6 |9 |4 |5 |
|5 |9 |2 |7 |8 |7 |3 |
|6 |1 |1 |4 |2 |8 |4 |
+---+----+----+----+----+----+----+
transposeUDF(df, Seq("uid")).show(12,false)
+---+-----------+------------+
|uid|column_name|column_value|
+---+-----------+------------+
|1 |col1 |1 |
|1 |col2 |2 |
|1 |col3 |3 |
|1 |col4 |8 |
|1 |col5 |4 |
|1 |col6 |5 |
|2 |col1 |4 |
|2 |col2 |3 |
|2 |col3 |8 |
|2 |col4 |7 |
|2 |col5 |9 |
|2 |col6 |8 |
+---+-----------+------------+
only showing top 12 rows
参考文章
https://legacy.gitbook.com/book/jaceklaskowski/mastering-spark-sql/details
https://www.toutiao.com/i6631318012546793992/