网站用户行为分析项目之会话切割(四)=> 代码重构

墨蓝 2021-09-23 13:48 328阅读 0赞

文章目录

  • 0x00 文章内容
  • 0x01 实现输出代码的重构
            1. 抽离输出代码
            1. 重构输出路径
            1. 重构输出文件类型
  • 0x02 校验结果
            1. 生成不同的输出文件类型
  • 0xFF 总结

0x00 文章内容

  1. 实现输出代码的重构
  2. 校验结果

0x01 实现输出代码的重构

1. 抽离输出代码

a. 因为SessionCutETL里的main方法写了比较多的代码,此时我们可以将第6步骤的输出代码进行抽离,全选,选中Refactor=>Extract=>Method
在这里插入图片描述
b. 我们这里选择第一个在这里插入图片描述
c. 填写方法名,点击OK
在这里插入图片描述
即可发现已经抽离出了方法。

2. 重构输出路径

a. 下面的代码中路径都是写死的,而且出现了共同的路径,我们可以进行统一

  1. val trackerLogOutputPath = "data/output/trackerLog"
  2. val trackerSessionOutputPath = "data/output/trackerSession"

修改如下:

  1. writeOutputData(sc,"data/output", parsedLogRDD, cookieLabeledSessionRDD)
  2. private def writeOutputData(sc: SparkContext, baseOutputPath: String, parsedLogRDD: RDD[TrackerLog], cookieLabeledSessionRDD: RDD[TrackerSession]) = {
  3. val trackerLogOutputPath = s"${baseOutputPath}/trackerLog"
  4. val trackerSessionOutputPath = s"${baseOutputPath}/trackerSession"

我们暂且这样重构先,此时需要重新执行一下代码,看一下改后是否能执行。

d. 执行验证,执行会报错,表示路径的文件已经存在,此时可以手动删除再执行,这里下一步是在代码中实现删除。

  1. Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory data/output/trackerLog already exists

e. 添加判断路径代码

  1. val fileSystem = FileSystem.get(sc.hadoopConfiguration)
  2. val path = new Path(trackerLogOutputPath)
  3. if (fileSystem.exists(path)) {
  4. fileSystem.delete(path, true)
  5. }

写完之后再抽离出去,取名deletePathIfExists方法。

f. 此时再重新执行代码,发现代码没有报错,也能得到想要的结果。

3. 重构输出文件类型

目前的情况是以Parquet的形式保存着,如果此时如果需求发生变化了,或者有其他格式的需求,如保存成TextFile格式。就要重新改代码了,如果需求又变了,或者写成其他组件,又要重新改,重新然后打包,重新然后上传,这样非常麻烦。试想,如果此时将需要修改的代码抽象成一个接口,就会大大方便了。

a. 本优化内容比较多,涉及两个类,代码量及过程比较多:
在这里插入图片描述
b. OutputComponent完整代码如下:

  1. package com.shaonaiyi.session
  2. import com.shaonaiyi.spark.session.{ TrackerLog, TrackerSession}
  3. import org.apache.hadoop.fs.{ FileSystem, Path}
  4. import org.apache.parquet.avro.{ AvroParquetOutputFormat, AvroWriteSupport}
  5. import org.apache.spark.SparkContext
  6. import org.apache.spark.rdd.RDD
  7. /** * @Auther: shaonaiyi@163.com * @Date: 2019/12/30 21:09 * @Description: 抽象输出文件组件 */
  8. trait OutputComponent {
  9. def writeOutputData(sc: SparkContext, baseOutputPath: String,
  10. parsedLogRDD: RDD[TrackerLog],
  11. cookieLabeledSessionRDD: RDD[TrackerSession]) = {
  12. deletePathIfExists(sc, baseOutputPath)
  13. }
  14. private def deletePathIfExists(sc: SparkContext, trackerLogOutputPath: String) = {
  15. val fileSystem = FileSystem.get(sc.hadoopConfiguration)
  16. val path = new Path(trackerLogOutputPath)
  17. if (fileSystem.exists(path)) {
  18. fileSystem.delete(path, true)
  19. }
  20. }
  21. }
  22. object OutputComponent {
  23. def fromOutputFileType(fileType: String) = {
  24. if (fileType.equals("parquet")) {
  25. new ParquetFileOutput
  26. } else {
  27. new TextFileOutput
  28. }
  29. }
  30. }
  31. /** * 写Parquet格式文件 */
  32. class ParquetFileOutput extends OutputComponent {
  33. override def writeOutputData(sc: SparkContext, baseOutputPath: String,
  34. parsedLogRDD: RDD[TrackerLog],
  35. cookieLabeledSessionRDD: RDD[TrackerSession]): Unit = {
  36. super.writeOutputData(sc, baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)
  37. //6、保存数据
  38. //6.1、保存TrackerLog,对应的是parsedLogRDD
  39. val trackerLogOutputPath = s"${baseOutputPath}/trackerLog"
  40. AvroWriteSupport.setSchema(sc.hadoopConfiguration, TrackerLog.SCHEMA$)
  41. parsedLogRDD.map((null, _)).saveAsNewAPIHadoopFile(trackerLogOutputPath,
  42. classOf[Void], classOf[TrackerLog], classOf[AvroParquetOutputFormat[TrackerLog]]
  43. )
  44. //6.2、保存TrackerSession,对应的是cookieLabeledSessionRDD
  45. val trackerSessionOutputPath = s"${baseOutputPath}/trackerSession"
  46. AvroWriteSupport.setSchema(sc.hadoopConfiguration, TrackerSession.SCHEMA$)
  47. cookieLabeledSessionRDD.map((null, _)).saveAsNewAPIHadoopFile(trackerSessionOutputPath,
  48. classOf[Void], classOf[TrackerSession], classOf[AvroParquetOutputFormat[TrackerSession]]
  49. )
  50. }
  51. }
  52. /** * 写TextFile格式文件 */
  53. class TextFileOutput extends OutputComponent {
  54. override def writeOutputData(sc: SparkContext, baseOutputPath: String,
  55. parsedLogRDD: RDD[TrackerLog],
  56. cookieLabeledSessionRDD: RDD[TrackerSession]): Unit = {
  57. super.writeOutputData(sc, baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)
  58. //6、保存数据
  59. //6.1、保存TrackerLog,对应的是parsedLogRDD
  60. val trackerLogOutputPath = s"${baseOutputPath}/trackerLog"
  61. parsedLogRDD.saveAsTextFile(trackerLogOutputPath)
  62. //6.2、保存TrackerSession,对应的是cookieLabeledSessionRDD
  63. val trackerSessionOutputPath = s"${baseOutputPath}/trackerSession"
  64. cookieLabeledSessionRDD.saveAsTextFile(trackerSessionOutputPath)
  65. }
  66. }

编写思路:先写OutputComponent接口,然后写ParquetFileOutput类继承OutputComponent,最后写一个伴生类OutputComponent以方便调用,最后进行代码优化,将deletePathIfExists抽离出来。

代码讲解:

  1. OutputComponent是一个Trait类型,属于接口抽象类,目的将输出格式接口化,原本是只有一个Parquet格式的,现在再添加了一个TextFile格式,实现的功能其实就是与输出Parquet格式的代码相类似。ParquetFileOutputTextFileOutput均继承此类,所以需要实现writeOutputData方法,写好之后,需要再写一个伴生类来调用,并且判断输入的是哪种类型;除此之外,还简化了deletePathIfExists方法,统一用super进行了了调用。
  2. saveAsNewAPIHadoopFile需要的参数是key-value类型,所以需要转成key-value先。

c. SessionCutETL完整代码如下:

  1. package com.shaonaiyi.session
  2. import com.shaonaiyi.spark.session.{ TrackerLog, TrackerSession}
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{ SparkConf, SparkContext}
  5. /** * @Auther: shaonaiyi@163.com * @Date: 2019/9/12 10:09 * @Description: 会话切割的程序主入口 */
  6. object SessionCutETL {
  7. private val logTypeSet = Set("pageview", "click")
  8. def main(args: Array[String]): Unit = {
  9. var conf = new SparkConf()
  10. conf.setAppName("SessionCutETL")
  11. conf.setMaster("local")
  12. conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  13. var sc = new SparkContext(conf)
  14. //网站域名标签数据,此处只是演示,其实可以存放在数据库里
  15. val domainLabelMap = Map(
  16. "www.baidu.com" -> "level1",
  17. "www.taobao.com" -> "level2",
  18. "jd.com" -> "level3",
  19. "youku.com" -> "level4"
  20. )
  21. //广播
  22. val domainLabelMapB = sc.broadcast(domainLabelMap)
  23. // sc.setLogLevel("ERROR")
  24. // 1、加载日志源数据
  25. val rawRDD: RDD[String] = sc.textFile("data/rawdata/visit_log.txt")
  26. // rawRDD.collect().foreach(println)
  27. //2、解析rawRDD中每一行日志源数据
  28. // val parsedLogRDD: RDD[Option[TrackerLog]] = rawRDD.map( line => RawLogParserUtil.parse(line))
  29. // val parsedLogRDD1: RDD[TrackerLog] = rawRDD.flatMap( line => RawLogParserUtil.parse(line))
  30. val parsedLogRDD: RDD[TrackerLog] = rawRDD.flatMap( line => RawLogParserUtil.parse(line))
  31. .filter(trackerLog => logTypeSet.contains(trackerLog.getLogType.toString))
  32. // parsedLogRDD.collect().foreach(println)
  33. // parsedLogRDD1.collect().foreach(println)
  34. //3、按照cookie进行分组
  35. val cookieGroupRDD: RDD[(String, Iterable[TrackerLog])] = parsedLogRDD.groupBy(trackerLog => trackerLog.getCookie.toString)
  36. // cookieGroupRDD.collect().foreach(println)
  37. //4、按user进行分组
  38. val sessionRDD: RDD[(String, TrackerSession)] = cookieGroupRDD.flatMapValues { case iter =>
  39. //处理每个user的日志
  40. val processor = new OneUserTrackerLogsProcessor(iter.toArray)
  41. processor.buildSessions(domainLabelMapB.value)
  42. }
  43. //5、给会话的cookie打标签
  44. val cookieLabelRDD: RDD[(String, String)] = sc.textFile("data/cookie_label.txt").map { case line =>
  45. val temp = line.split("\\|")
  46. (temp(0), temp(1)) // (cookie, cookie_label)
  47. }
  48. val joinRDD: RDD[(String,(TrackerSession, Option[String]))] = sessionRDD.leftOuterJoin(cookieLabelRDD)
  49. val cookieLabeledSessionRDD: RDD[TrackerSession] = joinRDD.map {
  50. case (cookie, (session, cookieLabelOpt)) =>
  51. if (cookieLabelOpt.nonEmpty) {
  52. session.setCookieLabel(cookieLabelOpt.get)
  53. } else {
  54. session.setCookieLabel("-")
  55. }
  56. session
  57. }
  58. //text & parquet
  59. OutputComponent.fromOutputFileType("text").writeOutputData(sc,"data/output", parsedLogRDD, cookieLabeledSessionRDD)
  60. sc.stop()
  61. }
  62. }

代码讲解:删除原先的输出格式的代码,用OutputComponent调用来实现。

0x02 校验结果

1. 生成不同的输出文件类型

a. 执行SessionCutETL类,查看文件的输出类型,此时生成了TextFile格式文件。
在这里插入图片描述
b. 修改此行代码的”text”为parquet,重新执行,查看结果,此时生成了Parquet格式文件。

  1. //text & parquet
  2. OutputComponent.fromOutputFileType("parquet").writeOutputData(sc,"data/output", parsedLogRDD, cookieLabeledSessionRDD)

在这里插入图片描述

0xFF 总结

  1. 代码重构是一项非常重要的技能,增强代码的可读性。
  2. 下一篇文章还会继续优化,请留意本博客,关注、评论、加点赞。
  3. 网站用户行为分析项目系列:
    网站用户行为分析项目之会话切割(一)
    网站用户行为分析项目之会话切割(二)
    网站用户行为分析项目之会话切割(三)
    网站用户行为分析项目之会话切割(四)待补充
    网站用户行为分析项目之会话切割(五)待补充

作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |

福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。


发表评论

表情:
评论列表 (有 0 条评论,328人围观)

还没有评论,来说两句吧...

相关阅读

    相关 用户行为分析系统

    刚刚做完提交代码,等待测试的反馈,怀着激动的心情,先看看效果,代码过段时间会放到github上。已经在这儿:[https://github.com/awhen/Big-data