Spark:每个用户连续登录最大天数

需求

请使用RDD、DF/DS API功能实现每个用户连续登录最大天数。

输出格式:

1
2
3
user     times   start_date   end_date
user_1 4 2021-08-01 2021-08-04
user_2 3 2021-07-30 2021-08-01

数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
user_1,20210801
user_1,20210802
user_1,20210803
user_1,20210804
user_1,20210806
user_1,20210807
user_1,20210808
user_1,20210811
user_1,20210812
user_2,20210730
user_2,20210731
user_2,20210801
user_2,20210804
user_2,20210806

RDD实现

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.spark.{SparkConf, SparkContext}

import java.text.SimpleDateFormat

object spring_job2 {
def main(args: Array[String]): Unit = {

val input = "data/login.log"

val sparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getCanonicalName)
val sc = new SparkContext(sparkConf)
val d = new SimpleDateFormat("yyyyMMdd")
val d2 = new SimpleDateFormat("yyyy-MM-dd")

sc.textFile(input).map(_.split(","))
.map(x =>(x(0), x(1)))
.groupByKey()
.mapValues(_.zipWithIndex.map(x=> {
val relative_day = d.format(d.parse(x._1).getTime - x._2 * 1000 * 60 * 60 * 24)
val end_day = x._1
(relative_day,end_day)
})
.groupBy(_._1).map(x=>(x._2.size,x._2.map(_._2).min,x._2.map(_._2).max))
).map(x=>(x._1,x._2.max._1,d2.format(d.parse(x._2.max._2)),d2.format(d.parse(x._2.max._3))))
.collect().foreach(println)

sc.stop()
}
}

思路

  1. group by name order by date

    1
    2
    (user_2,CompactBuffer(20210730, 20210731, 20210801, 20210804, 20210806))
    (user_1,CompactBuffer(20210801, 20210802, 20210803, 20210804, 20210806, 20210807, 20210808, 20210811, 20210812))
  2. group内zipWithIndex添加字段index:为组内每条记录增加index字段,从0开始,每条+1

    1
    2
    (user_2,List((20210730,0), (20210731,1), (20210801,2), (20210804,3), (20210806,4)))
    (user_1,List((20210801,0), (20210802,1), (20210803,2), (20210804,3), (20210806,4), (20210807,5), (20210808,6), (20210811,7), (20210812,8)))
  3. 添加字段relative_day:相对开始日期,它由(当前日期-index天数)所得,如果是连续登录,此日期相同。

    1
    2
    (user_2,List((20210730,20210730), (20210730,20210731), (20210730,20210801), (20210801,20210804), (20210802,20210806)))
    (user_1,List((20210801,20210801), (20210801,20210802), (20210801,20210803), (20210801,20210804), (20210802,20210806), (20210802,20210807), (20210802,20210808), (20210804,20210811), (20210804,20210812)))
  4. 得到relative_day后,按relative_day分组,找到连续登录的group

    1
    2
    (user_2,Map(20210730 -> List((20210730,20210730), (20210730,20210731), (20210730,20210801)), 20210801 -> List((20210801,20210804)), 20210802 -> List((20210802,20210806))))
    (user_1,Map(20210801 -> List((20210801,20210801), (20210801,20210802), (20210801,20210803), (20210801,20210804)), 20210802 -> List((20210802,20210806), (20210802,20210807), (20210802,20210808)), 20210804 -> List((20210804,20210811), (20210804,20210812))))
  5. 遍历每一组数据,并输出当前组的连续登录天数(size),开始日期(day字段的min),结束日期(day字段的max)

    1
    2
    (user_2,List((3,20210730,20210801), (1,20210804,20210804), (1,20210806,20210806)))
    (user_1,List((4,20210801,20210804), (3,20210806,20210808), (2,20210811,20210812)))
  6. 找出连续登录次数最大的一组,并格式化输出

    1
    2
    (user_2,3,2021-07-30,2021-08-01)
    (user_1,4,2021-08-01,2021-08-04)

DF/DS API

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

import java.text.SimpleDateFormat

object spring_job2_df {
def main(args: Array[String]): Unit = {

val input = "data/login.log"

val spark = SparkSession.builder()
.master("local[2]")
.appName(this.getClass.getCanonicalName)
.getOrCreate()

val d = new SimpleDateFormat("yyyyMMdd")
val d2 = new SimpleDateFormat("yyyy-MM-dd")
import spark.implicits._
import org.apache.spark.sql.functions._

val df = spark.read.textFile(input)

val frame = df.map(x=>{
val arr = x.split(",")
val user = arr(0)
val date = d2.format(d.parse(arr(1)))
Info(user,date)
}).toDF()

frame.createOrReplaceTempView("login")

spark.sql(
"""
|select
| t4.user,t4.times,t4.start_date,t4.end_date
|from
| (with
| t3
| as
| (select
| t2.user ,count(1) as times,min(t2.date) as start_date,max(t2.date) as end_date
| from
| (select
| t1.user,t1.date, date_add(t1.date,-t1.index) as relative_day
| from
| (select
| user,date,rank() over(partition by user order by date) as index
| from login
| ) t1
| ) t2
| group by t2.user,t2.relative_day
| )
| select t3.*,rank() over(partition by t3.user order by t3.times desc) rk from t3
| )t4
|where rk =1
|""".stripMargin).show(false)

spark.stop()
}
case class Info(user:String,date:String)
}

t1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
+-----+--------+-----+
|name |date |index|
+-----+--------+-----+
|pk |20210801|1 |
|pk |20210802|2 |
|pk |20210803|3 |
|pk |20210804|4 |
|pk |20210806|5 |
|pk |20210807|6 |
|pk |20210808|7 |
|pk |20210811|8 |
|pk |20210812|9 |
|ruoze|20210730|1 |
|ruoze|20210731|2 |
|ruoze|20210801|3 |
|ruoze|20210804|4 |
|ruoze|20210806|5 |
+-----+--------+-----+

t2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
+-----+----------+------------+
|name |date |relative_day|
+-----+----------+------------+
|pk |2021-08-01|2021-07-31 |
|pk |2021-08-02|2021-07-31 |
|pk |2021-08-03|2021-07-31 |
|pk |2021-08-04|2021-07-31 |
|pk |2021-08-06|2021-08-01 |
|pk |2021-08-07|2021-08-01 |
|pk |2021-08-08|2021-08-01 |
|pk |2021-08-11|2021-08-03 |
|pk |2021-08-12|2021-08-03 |
|ruoze|2021-07-30|2021-07-29 |
|ruoze|2021-07-31|2021-07-29 |
|ruoze|2021-08-01|2021-07-29 |
|ruoze|2021-08-04|2021-07-31 |
|ruoze|2021-08-06|2021-08-01 |
+-----+----------+------------+

t3

1
2
3
4
5
6
7
8
9
10
+-----+--------+----------+----------+
|name |count(1)|min(date) |max(date) |
+-----+--------+----------+----------+
|pk |4 |2021-08-01|2021-08-04|
|pk |3 |2021-08-06|2021-08-08|
|pk |2 |2021-08-11|2021-08-12|
|ruoze|3 |2021-07-30|2021-08-01|
|ruoze|1 |2021-08-04|2021-08-04|
|ruoze|1 |2021-08-06|2021-08-06|
+-----+--------+----------+----------+

t4

1
2
3
4
5
6
7
8
+-----+-----+----------+----------+---+
|pk |4 |2021-08-01|2021-08-04|1 |
|pk |3 |2021-08-06|2021-08-08|2 |
|pk |2 |2021-08-11|2021-08-12|3 |
|ruoze|3 |2021-07-30|2021-08-01|1 |
|ruoze|1 |2021-08-04|2021-08-04|2 |
|ruoze|1 |2021-08-06|2021-08-06|2 |
+-----+-----+----------+----------+---+

result

1
2
3
4
5
6
+-----+-----+----------+----------+
|user |times|start_date|end_date |
+-----+-----+----------+----------+
|pk |4 |2021-08-01|2021-08-04|
|ruoze|3 |2021-07-30|2021-08-01|
+-----+-----+----------+----------+