需求 请使用RDD、DF/DS API功能实现每个用户连续登录最大天数。
输出格式:
1 2 3 user times start_date end_date user_1 4 2021-08-01 2021-08-04 user_2 3 2021-07-30 2021-08-01
数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 user_1,20210801 user_1,20210802 user_1,20210803 user_1,20210804 user_1,20210806 user_1,20210807 user_1,20210808 user_1,20210811 user_1,20210812 user_2,20210730 user_2,20210731 user_2,20210801 user_2,20210804 user_2,20210806
RDD实现 代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import org.apache.spark.{SparkConf, SparkContext} import java.text.SimpleDateFormat object spring_job2 { def main(args: Array[String]): Unit = { val input = "data/login.log" val sparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getCanonicalName) val sc = new SparkContext(sparkConf) val d = new SimpleDateFormat("yyyyMMdd") val d2 = new SimpleDateFormat("yyyy-MM-dd") sc.textFile(input).map(_.split(",")) .map(x =>(x(0), x(1))) .groupByKey() .mapValues(_.zipWithIndex.map(x=> { val relative_day = d.format(d.parse(x._1).getTime - x._2 * 1000 * 60 * 60 * 24) val end_day = x._1 (relative_day,end_day) }) .groupBy(_._1).map(x=>(x._2.size,x._2.map(_._2).min,x._2.map(_._2).max)) ).map(x=>(x._1,x._2.max._1,d2.format(d.parse(x._2.max._2)),d2.format(d.parse(x._2.max._3)))) .collect().foreach(println) sc.stop() } }
思路
group by name order by date
1 2 (user_2,CompactBuffer(20210730, 20210731, 20210801, 20210804, 20210806)) (user_1,CompactBuffer(20210801, 20210802, 20210803, 20210804, 20210806, 20210807, 20210808, 20210811, 20210812))
group内zipWithIndex添加字段index:为组内每条记录增加index字段,从0开始,每条+1
1 2 (user_2,List((20210730,0), (20210731,1), (20210801,2), (20210804,3), (20210806,4))) (user_1,List((20210801,0), (20210802,1), (20210803,2), (20210804,3), (20210806,4), (20210807,5), (20210808,6), (20210811,7), (20210812,8)))
添加字段relative_day:相对开始日期,它由(当前日期-index天数)所得,如果是连续登录,此日期相同。
1 2 (user_2,List((20210730,20210730), (20210730,20210731), (20210730,20210801), (20210801,20210804), (20210802,20210806))) (user_1,List((20210801,20210801), (20210801,20210802), (20210801,20210803), (20210801,20210804), (20210802,20210806), (20210802,20210807), (20210802,20210808), (20210804,20210811), (20210804,20210812)))
得到relative_day后,按relative_day分组,找到连续登录的group
1 2 (user_2,Map(20210730 -> List((20210730,20210730), (20210730,20210731), (20210730,20210801)), 20210801 -> List((20210801,20210804)), 20210802 -> List((20210802,20210806)))) (user_1,Map(20210801 -> List((20210801,20210801), (20210801,20210802), (20210801,20210803), (20210801,20210804)), 20210802 -> List((20210802,20210806), (20210802,20210807), (20210802,20210808)), 20210804 -> List((20210804,20210811), (20210804,20210812))))
遍历每一组数据,并输出当前组的连续登录天数(size),开始日期(day字段的min),结束日期(day字段的max)
1 2 (user_2,List((3,20210730,20210801), (1,20210804,20210804), (1,20210806,20210806))) (user_1,List((4,20210801,20210804), (3,20210806,20210808), (2,20210811,20210812)))
找出连续登录次数最大的一组,并格式化输出
1 2 (user_2,3,2021-07-30,2021-08-01) (user_1,4,2021-08-01,2021-08-04)
DF/DS API 代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import java.text.SimpleDateFormat object spring_job2_df { def main(args: Array[String]): Unit = { val input = "data/login.log" val spark = SparkSession.builder() .master("local[2]") .appName(this.getClass.getCanonicalName) .getOrCreate() val d = new SimpleDateFormat("yyyyMMdd") val d2 = new SimpleDateFormat("yyyy-MM-dd") import spark.implicits._ import org.apache.spark.sql.functions._ val df = spark.read.textFile(input) val frame = df.map(x=>{ val arr = x.split(",") val user = arr(0) val date = d2.format(d.parse(arr(1))) Info(user,date) }).toDF() frame.createOrReplaceTempView("login") spark.sql( """ |select | t4.user,t4.times,t4.start_date,t4.end_date |from | (with | t3 | as | (select | t2.user ,count(1) as times,min(t2.date) as start_date,max(t2.date) as end_date | from | (select | t1.user,t1.date, date_add(t1.date,-t1.index) as relative_day | from | (select | user,date,rank() over(partition by user order by date) as index | from login | ) t1 | ) t2 | group by t2.user,t2.relative_day | ) | select t3.*,rank() over(partition by t3.user order by t3.times desc) rk from t3 | )t4 |where rk =1 |""".stripMargin).show(false) spark.stop() } case class Info(user:String,date:String) }
表 t1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 +-----+--------+-----+ |name |date |index| +-----+--------+-----+ |pk |20210801|1 | |pk |20210802|2 | |pk |20210803|3 | |pk |20210804|4 | |pk |20210806|5 | |pk |20210807|6 | |pk |20210808|7 | |pk |20210811|8 | |pk |20210812|9 | |ruoze|20210730|1 | |ruoze|20210731|2 | |ruoze|20210801|3 | |ruoze|20210804|4 | |ruoze|20210806|5 | +-----+--------+-----+
t2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 +-----+----------+------------+ |name |date |relative_day| +-----+----------+------------+ |pk |2021-08-01|2021-07-31 | |pk |2021-08-02|2021-07-31 | |pk |2021-08-03|2021-07-31 | |pk |2021-08-04|2021-07-31 | |pk |2021-08-06|2021-08-01 | |pk |2021-08-07|2021-08-01 | |pk |2021-08-08|2021-08-01 | |pk |2021-08-11|2021-08-03 | |pk |2021-08-12|2021-08-03 | |ruoze|2021-07-30|2021-07-29 | |ruoze|2021-07-31|2021-07-29 | |ruoze|2021-08-01|2021-07-29 | |ruoze|2021-08-04|2021-07-31 | |ruoze|2021-08-06|2021-08-01 | +-----+----------+------------+
t3
1 2 3 4 5 6 7 8 9 10 +-----+--------+----------+----------+ |name |count(1)|min(date) |max(date) | +-----+--------+----------+----------+ |pk |4 |2021-08-01|2021-08-04| |pk |3 |2021-08-06|2021-08-08| |pk |2 |2021-08-11|2021-08-12| |ruoze|3 |2021-07-30|2021-08-01| |ruoze|1 |2021-08-04|2021-08-04| |ruoze|1 |2021-08-06|2021-08-06| +-----+--------+----------+----------+
t4
1 2 3 4 5 6 7 8 +-----+-----+----------+----------+---+ |pk |4 |2021-08-01|2021-08-04|1 | |pk |3 |2021-08-06|2021-08-08|2 | |pk |2 |2021-08-11|2021-08-12|3 | |ruoze|3 |2021-07-30|2021-08-01|1 | |ruoze|1 |2021-08-04|2021-08-04|2 | |ruoze|1 |2021-08-06|2021-08-06|2 | +-----+-----+----------+----------+---+
result
1 2 3 4 5 6 +-----+-----+----------+----------+ |user |times|start_date|end_date | +-----+-----+----------+----------+ |pk |4 |2021-08-01|2021-08-04| |ruoze|3 |2021-07-30|2021-08-01| +-----+-----+----------+----------+