网站用户行为分析项目之会话切割(六)=> 参数配置化

不念不忘少年蓝@ 2021-09-24 22:32 289阅读 0赞

大家好,我是邵奈一,一个不务正业的程序猿、正儿八经的斜杠青年。
1、世人称我为:被代码耽误的诗人、没天赋的书法家、五音不全的歌手、专业跑龙套演员、不合格的运动员…
2、这几年,我整理了很多IT技术相关的教程给大家,主要是大数据教程,帮助了很多小伙伴入坑大数据行业。
3、如果您觉得文章有用,请收藏,转发,评论,并关注我,谢谢!
博客导航跳转(请收藏):邵奈一的技术博客导航
| 公众号 | 微信 | 微博 | CSDN | 简书 |


教程目录

  • 0x00 教程内容
  • 0x01 运行模式配置化
  • 0x02 路径配置化
  • 0x03 输出类型配置化
  • 0xFF 总结

0x00 教程内容

  1. 运行模式配置化
  2. 路径配置化
  3. 输出类型配置化

注意:以下代码均在 SessionCutETL 中修改。

0x01 运行模式配置化

目前我们是在本地运行的,如果是放到集群运行,则需要修改相应的代码,如果在本地测试,又要加上之前的代码,非常不方便。于是,我们可以给运行的模式加上参数适配。

(1)给设置运行模式加上判断条件:

  1. conf.setMaster("local")
  2. //修改为:
  3. if(!conf.contains("spark.master")) {
  4. conf.setMaster("local")
  5. }

0x02 路径配置化

目前我们的路径都是写死在代码里的,我们应该进行参数化处理,如果有传参数则使用传参的值,如果没有,则使用默认值。

(1)添加以下四行代码,并且需要修改代码里的路径:

  1. // 通过配置拿到我们配置的输入和输出路径
  2. val visitLogsInputPath = conf.get("spark.sessioncut.visitLogsInputPath", "data/rawdata/visit_log.txt")
  3. val cookieLabelInputPath = conf.get("spark.sessioncut.cookieLabelInputPath", "data/cookie_label.txt")
  4. val baseOutputPath = conf.get("spark.sessioncut.baseOutputPath", "data/output")

0x03 输出类型配置化

目前我们是直接把输出类型在代码中写死的,需要改进。
(1)添加一行代码,并修改代码里的字符串:

  1. val outputFileType = if (args.nonEmpty) args(0) else "text"
  2. // 修改输出代码为:
  3. // text & parquet
  4. OutputComponent.fromOutputFileType(outputFileType).writeOutputData(sc,baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)

完整代码如下:
(2)SessionCutETL 的完整代码如下:

  1. package com.shaonaiyi.session
  2. import com.shaonaiyi.spark.session.{ TrackerLog, TrackerSession}
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.storage.StorageLevel
  5. import org.apache.spark.{ SparkConf, SparkContext}
  6. /** * @Auther: shaonaiyi@163.com * @Date: 2019/9/12 10:09 * @Description: 会话切割的程序主入口 */
  7. object SessionCutETL {
  8. private val logTypeSet = Set("pageview", "click")
  9. def main(args: Array[String]): Unit = {
  10. var conf = new SparkConf()
  11. conf.setAppName("SessionCutETL")
  12. if(!conf.contains("spark.master")) {
  13. conf.setMaster("local")
  14. }
  15. conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  16. var sc = new SparkContext(conf)
  17. // 通过配置拿到我们配置的输入和输出路径
  18. val visitLogsInputPath = conf.get("spark.sessioncut.visitLogsInputPath", "data/rawdata/visit_log.txt")
  19. val cookieLabelInputPath = conf.get("spark.sessioncut.cookieLabelInputPath", "data/cookie_label.txt")
  20. val baseOutputPath = conf.get("spark.sessioncut.baseOutputPath", "data/output")
  21. val outputFileType = if (args.nonEmpty) args(0) else "text"
  22. //网站域名标签数据,此处只是演示,其实可以存放在数据库里
  23. val domainLabelMap = Map(
  24. "www.baidu.com" -> "level1",
  25. "www.taobao.com" -> "level2",
  26. "jd.com" -> "level3",
  27. "youku.com" -> "level4"
  28. )
  29. //广播
  30. val domainLabelMapB = sc.broadcast(domainLabelMap)
  31. // 1、加载日志源数据
  32. val rawRDD: RDD[String] = sc.textFile(visitLogsInputPath)
  33. //2、解析rawRDD中每一行日志源数据
  34. val parsedLogRDD: RDD[TrackerLog] = rawRDD.flatMap( line => RawLogParserUtil.parse(line))
  35. .filter(trackerLog => logTypeSet.contains(trackerLog.getLogType.toString))
  36. //缓存parsedLogRDD
  37. parsedLogRDD.persist(StorageLevel.MEMORY_AND_DISK)
  38. //3、按照cookie进行分组,一个cookie及一个user
  39. val cookieGroupRDD: RDD[(String, Iterable[TrackerLog])] = parsedLogRDD.groupBy(trackerLog => trackerLog.getCookie.toString)
  40. //4、生成会话RDD
  41. val sessionRDD: RDD[(String, TrackerSession)] = cookieGroupRDD.flatMapValues { case iter =>
  42. //处理每个user的日志
  43. val processor = new OneUserTrackerLogsProcessor(iter.toArray)
  44. // val processor = new OneUserTrackerLogsProcessor(iter.toArray) with PageviewSessionGenerator
  45. processor.buildSessions(domainLabelMapB.value)
  46. }
  47. //5、给会话的cookie打标签
  48. val cookieLabelRDD: RDD[(String, String)] = sc.textFile(cookieLabelInputPath).map { case line =>
  49. val temp = line.split("\\|")
  50. (temp(0), temp(1)) // (cookie, cookie_label)
  51. }
  52. val joinRDD: RDD[(String,(TrackerSession, Option[String]))] = sessionRDD.leftOuterJoin(cookieLabelRDD)
  53. val cookieLabeledSessionRDD: RDD[TrackerSession] = joinRDD.map {
  54. case (cookie, (session, cookieLabelOpt)) =>
  55. if (cookieLabelOpt.nonEmpty) {
  56. session.setCookieLabel(cookieLabelOpt.get)
  57. } else {
  58. session.setCookieLabel("-")
  59. }
  60. session
  61. }
  62. //text & parquet
  63. OutputComponent.fromOutputFileType(outputFileType).writeOutputData(sc,baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)
  64. sc.stop()
  65. }
  66. }

0xFF 总结

  1. 下一篇文章我们会打包我们的代码放到集群去运行。

邵奈一 原创不易,如转载请标明出处,教育是一生的事业。


发表评论

表情:
评论列表 (有 0 条评论,289人围观)

还没有评论,来说两句吧...

相关阅读

    相关 用户行为分析系统

    刚刚做完提交代码,等待测试的反馈,怀着激动的心情,先看看效果,代码过段时间会放到github上。已经在这儿:[https://github.com/awhen/Big-data

    相关 行为参数(lambda)

    行为参数化:就是一个方法接收多个不同的行为参数,并在内部使用他们,完成不同的行为能力。行为参数化可以更好的适应不断变化的需求,减少实际开发工作量。 现在有一些需求,苹果农场的