Fork me on GitHub

sparksql实战案例

累计统计

准备数据

access.csv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
A,2015-01,5
A,2015-01,15
A,2015-01,5
A,2015-01,8
A,2015-02,4
A,2015-02,6
A,2015-03,16
A,2015-03,22
A,2015-04,10
A,2015-04,50
B,2015-01,5
B,2015-01,25
B,2015-02,10
B,2015-02,5
B,2015-03,23
B,2015-03,10
B,2015-03,1
B,2015-04,10
B,2015-04,50

准备环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
object AccumulatorCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("DateFrameFromJsonScala")
.config("spark.some.config.option", "some-value")
.getOrCreate()

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
// 读取数据
val usersDF = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "false")
.load("src/main/resources/access.csv")
.toDF("name", "mounth", "amount")
}
}

具体实现逻辑

DataFrame API 方式

方式一:

1
2
3
// rowsBetween(Long.MinValue, 0):窗口的大小是按照排序从最小值到当前行
val accuCntSpec = Window.partitionBy("name").orderBy("mounth").rowsBetween(Long.MinValue, 0)
usersDF.withColumn("acc_amount", sum(usersDF("amount")).over(accuCntSpec)).show()

方式二

1
2
3
4
5
6
usersDF.select(
$"name",
$"mounth",
$"amount",
sum($"amount").over(accuCntSpec).as("acc_amount")
).show()

sql方式

思路:根据DF算子意思,找到SqlBase.g4文件,看看是否有该类sql支持。
在SqlBase.g4文件中刚好找到如下内容

1
2
3
4
5
6
7
8
9
10
11
12
windowFrame
: frameType=RANGE start=frameBound
| frameType=ROWS start=frameBound
| frameType=RANGE BETWEEN start=frameBound AND end=frameBound
| frameType=ROWS BETWEEN start=frameBound AND end=frameBound
;

frameBound
: UNBOUNDED boundType=(PRECEDING | FOLLOWING)
| boundType=CURRENT ROW
| expression boundType=(PRECEDING | FOLLOWING)
;

在spark源码sql模块core项目org.apache.spark.sql.execution包中找到SQLWindowFunctionSuite类找到如下测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
test("window function: multiple window expressions in a single expression") {
val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
nums.createOrReplaceTempView("nums")

val expected =
Row(1, 1, 1, 55, 1, 57) ::
Row(0, 2, 3, 55, 2, 60) ::
Row(1, 3, 6, 55, 4, 65) ::
Row(0, 4, 10, 55, 6, 71) ::
Row(1, 5, 15, 55, 9, 79) ::
Row(0, 6, 21, 55, 12, 88) ::
Row(1, 7, 28, 55, 16, 99) ::
Row(0, 8, 36, 55, 20, 111) ::
Row(1, 9, 45, 55, 25, 125) ::
Row(0, 10, 55, 55, 30, 140) :: Nil

val actual = sql(
"""
|SELECT
| y,
| x,
| sum(x) OVER w1 AS running_sum,
| sum(x) OVER w2 AS total_sum,
| sum(x) OVER w3 AS running_sum_per_y,
| ((sum(x) OVER w1) + (sum(x) OVER w2) + (sum(x) OVER w3)) as combined2
|FROM nums
|WINDOW w1 AS (ORDER BY x ROWS BETWEEN UnBOUNDED PRECEDiNG AND CuRRENT RoW),
| w2 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOuNDED FoLLOWING),
| w3 AS (PARTITION BY y ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
""".stripMargin)

checkAnswer(actual, expected)

spark.catalog.dropTempView("nums")
}

下面就可以开心的照着案例写sql去了,真嗨皮!!!!

1
2
3
4
5
6
7
8
9
10
usersDF.createOrReplaceTempView("access")
spark.sql(
"""
|select name,
| mounth,
| amount,
| sum(amount) over (partition by name order by mounth asc rows between unbounded preceding and current row ) as acc_amount
|from access
|
""".stripMargin).show()

累加N天之前,假设N=3

DataFrame API方式

1
2
3
4
5
6
val preThreeAccuCntSpec = Window.partitionBy("name").orderBy("mounth").rowsBetween(-3, 0)
usersDF.select(
$"name",
$"mounth",
$"amount",
sum($"amount").over(preThreeAccuCntSpec).as("acc_amount")).show()

sql方式

1
2
3
4
5
6
7
8
9
spark.sql(
"""
|select name,
| mounth,
| amount,
| sum(amount) over (partition by name order by mounth asc rows between 3 preceding and current row) as acc_amount
|from access
|
""".stripMargin).show()

累加前3天,后3天

API方式

1
2
3
4
5
6
val preThreeFiveAccuCntSpec = Window.partitionBy("name").orderBy("mounth").rowsBetween(3, 3)
usersDF.select(
$"name",
$"mounth",
$"amount",
sum($"amount").over(preThreeFiveAccuCntSpec).as("acc_amount"))

sql方式

1
2
3
4
5
6
7
8
9
spark.sql(
"""
|select name,
| mounth,
| amount,
| sum(amount) over (partition by name order by mounth asc rows between 3 preceding and 3 following) as acc_amount
|from access
|
""".stripMargin).show()

基本窗口函数案例

准备环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
object WindowFunctionTest extends BaseSparkSession {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("WindowFunctionTest")
.config("spark.some.config.option", "some-value")
.getOrCreate()

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val df = List(
("位置1", "2018-01-01", 50),
("位置1", "2018-01-02", 45),
("位置1", "2018-01-03", 55),
("位置2", "2018-01-01", 25),
("位置2", "2018-01-02", 29),
("位置2", "2018-01-03", 27)
).toDF("site", "date", "user_cnt")
}
}

平均移动值

DataFrame API方式实现

1
2
3
// 窗口定义从 -1(前一行)到 1(后一行)	,每一个滑动的窗口总用有3行
val movinAvgSpec = Window.partitionBy("site").orderBy("date").rowsBetween(-1, 1)
df.withColumn("MovingAvg", avg(df("user_cnt")).over(movinAvgSpec)).show()

sql方式实现

1
2
3
4
5
6
7
8
9
df.createOrReplaceTempView("site_info")
spark.sql(
"""
|select site,
| date,
| user_cnt,
| avg(user_cnt) over(partition by site order by date rows between 1 preceding and 1 following) as moving_avg
|from site_info
""".stripMargin).show()

前一行数据

DataFrame API方式实现

1
2
val lagwSpec = Window.partitionBy("site").orderBy("date")
df.withColumn("prevUserCnt", lag(df("user_cnt"), 1).over(lagwSpec)).show()

sql方式实现

1
2
3
4
5
6
7
8
9
df.createOrReplaceTempView("site_info")
spark.sql(
"""
|select site,
| date,
| user_cnt,
| lag(user_cnt,1) over(partition by site order by date asc ) as prevUserCnt
|from site_info
""".stripMargin).show()

排名

DataFrame API方式实现

1
2
val rankwSpec = Window.partitionBy("site").orderBy("date")
df.withColumn("rank", rank().over(rankwSpec))

sql方式

1
2
3
4
5
6
7
8
spark.sql(
"""
|select site,
| date,
| user_cnt,
| rank() over(partition by site order by date asc ) as prevUserCnt
|from site_info
""".stripMargin).show()

分组topn和分组取最小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
object GroupBy {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("DateFrameFromJsonScala")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val rows = spark.sparkContext.parallelize(
List(
("shop2", "2018-02-22", 1),
("shop2", "2018-02-27", 1),
("shop2", "2018-03-13", 1),
("shop2", "2018-03-20", 5),
("shop1", "2018-03-27", 1),
("shop1", "2018-04-03", 1),
("shop1", "2018-04-10", 1),
("shop1", "2018-04-17", 1),
("shop2", "2018-04-28", 1),
("shop2", "2018-04-05", 10),
("shop2", "2018-04-09", 1)))
val rowRDD = rows.map(t => Row(t._1, t._2, t._3))
val schema = StructType(
Array(
StructField("shop", StringType),
StructField("ycd_date", StringType),
StructField("ycd_num", IntegerType)))
val df = spark.createDataFrame(rowRDD, schema)
df.createOrReplaceTempView("ycd_order")
// 分组topN
val topN = spark.sql("select * from (SELECT o.shop,o.ycd_date, row_number() over (PARTITION BY o.shop ORDER BY o.ycd_date DESC) rank FROM ycd_order as o) o1 where rank < 2")
// 根据某一个字段分组,取某一个字段的最小值
val groupMin = spark.sql("select o.shop,min(o.ycd_num) as min_num from ycd_order as o group by o.shop order by min_num ")
topN.show()
spark.stop()
}
}

优雅方式定义scheme

1
2
3
4
5
6
7
8
9
10
11
def getScheme(): StructType = {
val schemaString = "store_id:String,order_date:String,sale_amount: Int"
val fields = schemaString.split(",")
.map(fieldName =>
StructField(fieldName.split(":")(0).trim,
fieldName.split(":")(1).trim match {
case "String" => StringType
case "Int" => IntegerType
}, true))
StructType(fields)
}

保存小数点后n位

10表示总的位数,2表示保留几位小数,10要>=实际的位数,否则为NULL

1
spark.sql("select cast(sale_amount as decimal(10, 2))from ycd").show()

重命名行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler
val firstDF = spark.createDataFrame(Seq(
(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 7, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 8, 7, 3),
(6, 1, 1, 4, 2, 8, 4)
)).toDF()
val colNames = Seq("uid", "col1", "col2", "col3", "col4", "col5", "col6")
val secondDF = firstDF.toDF(colNames: _*)

转置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = spark.createDataFrame(Seq(
(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 7, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 8, 7, 3),
(6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", "col6")

df.show(10,false)

// Create the transpose user defined function.
// Imputs:
// transDF: The dataframe which will be transposed
// transBy: The column that the dataframe will be transposed by
// Outputs:
// Dataframe datatype consisting of three columns:
// transBy
// column_name
// column_value
def transposeUDF(transDF: DataFrame, transBy: Seq[String]): DataFrame = {
val (cols, types) = transDF.dtypes.filter{ case (c, _) => !transBy.contains(c)}.unzip
require(types.distinct.size == 1)

val kvs = explode(array(
cols.map(c => struct(lit(c).alias("column_name"), col(c).alias("column_value"))): _*
))
val byExprs = transBy.map(col(_))

transDF
.select(byExprs :+ kvs.alias("_kvs"): _*)
.select(byExprs ++ Seq($"_kvs.column_name", $"_kvs.column_value"): _*)
}
transposeUDF(df, Seq("uid")).show(12,false)
Output:
df.show(10,false)
+---+----+----+----+----+----+----+
|uid|col1|col2|col3|col4|col5|col6|
+---+----+----+----+----+----+----+
|1 |1 |2 |3 |8 |4 |5 |
|2 |4 |3 |8 |7 |9 |8 |
|3 |6 |1 |9 |2 |3 |6 |
|4 |7 |8 |6 |9 |4 |5 |
|5 |9 |2 |7 |8 |7 |3 |
|6 |1 |1 |4 |2 |8 |4 |
+---+----+----+----+----+----+----+

transposeUDF(df, Seq("uid")).show(12,false)
+---+-----------+------------+
|uid|column_name|column_value|
+---+-----------+------------+
|1 |col1 |1 |
|1 |col2 |2 |
|1 |col3 |3 |
|1 |col4 |8 |
|1 |col5 |4 |
|1 |col6 |5 |
|2 |col1 |4 |
|2 |col2 |3 |
|2 |col3 |8 |
|2 |col4 |7 |
|2 |col5 |9 |
|2 |col6 |8 |
+---+-----------+------------+
only showing top 12 rows

本文标题:sparksql实战案例

文章作者:tang

发布时间:2019年04月10日 - 20:04

最后更新:2019年04月10日 - 20:04

原始链接:https://tgluon.github.io/2019/04/10/spark sql实战案例/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

-------------本文结束感谢您的阅读-------------