Flink-电商用户行为分析(基于服务器 log的热门页面浏览量统计 )

桃扇骨 2023-02-23 08:57 65阅读 0赞

数据
链接:https://pan.baidu.com/s/1gT6GhMQckShlbhLCqb\_pWQ
提取码:558h
我们现在要实现的模块是 “实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从 web 服务器的日志中提取出来。
我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行 log,统计在一段时间内用户访问每一个 url 的次数,然后排序输出显示。
具体做法为:每隔 5 秒,输出最近 10 分钟内访问量最多的前 N 个 URL
需要注意的是,原始日志中的时间是“dd/MM/yyyy:HH:mm:ss”的形式,需要定义一个 DateTimeFormat 将其转换为我们需要的时间戳格式:

  1. val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
  2. val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime

完整代码

  1. import java.sql.Timestamp
  2. import java.text.SimpleDateFormat
  3. import org.apache.flink.api.common.functions.AggregateFunction
  4. import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor}
  5. import org.apache.flink.api.scala._
  6. import org.apache.flink.streaming.api.TimeCharacteristic
  7. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  8. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  9. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  10. import org.apache.flink.streaming.api.scala.function.WindowFunction
  11. import org.apache.flink.streaming.api.windowing.time.Time
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  13. import org.apache.flink.util.Collector
  14. import scala.collection.mutable.ListBuffer
  15. //204.62.56.3 - - 20/05/2015:17:05:11 +0000 GET /presentations/logstash-puppetconf-2012/images/nagios-sms2.png
  16. //输入数据样例类
  17. case class ApacheLogEvent(ip:String,userId:String,evetTime:Long,method:String,url:String)
  18. //窗口聚合结果样例类
  19. case class UrlViewCount(url:String,windowEnd:Long,count:Long)
  20. object NetworkFlow {
  21. def main(args: Array[String]): Unit = {
  22. val env = StreamExecutionEnvironment.getExecutionEnvironment
  23. env.setParallelism(1)
  24. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  25. val dataStream = env.readTextFile("D:\\idea\\flinkUser\\NetworkFlowAnalysis\\src\\main\\resources\\apache.log")
  26. .map(data=>{
  27. val dataArray = data.split(" ")
  28. //定义事件转换 20/05/2015:17:05:11
  29. val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
  30. val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime
  31. ApacheLogEvent(dataArray(0).trim,dataArray(1).trim,timestamp,dataArray(5).trim,dataArray(6).trim)
  32. })
  33. //*1000看是秒还是毫秒 秒*1000 毫秒不乘
  34. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) {
  35. override def extractTimestamp(t: ApacheLogEvent): Long = t.evetTime
  36. })
  37. .keyBy(_.url)
  38. .timeWindow(Time.minutes(10),Time.seconds(5))
  39. //允许60秒的延迟数据去更新
  40. .allowedLateness(Time.seconds(60))
  41. .aggregate(new CountAgg(),new WindowResult())
  42. .keyBy(_.windowEnd)
  43. .process(new TopNHotUrls(5))
  44. dataStream.print()
  45. env.execute("network flow job")
  46. }
  47. }
  48. //自定义预聚合函数
  49. class CountAgg() extends AggregateFunction[ApacheLogEvent,Long,Long]{
  50. override def createAccumulator(): Long = 0L
  51. override def add(in: ApacheLogEvent, acc: Long): Long = acc+1
  52. override def getResult(acc: Long): Long = acc
  53. override def merge(acc: Long, acc1: Long): Long = acc + acc1
  54. }
  55. //自定义窗口函数
  56. class WindowResult() extends WindowFunction[Long,UrlViewCount,String,TimeWindow]{
  57. override def apply(key: String, window: TimeWindow, input: Iterable[Long],
  58. out: Collector[UrlViewCount]): Unit = {
  59. out.collect(UrlViewCount(key,window.getEnd,input.iterator.next()))
  60. }
  61. }
  62. //自定义的处理函数 第一个windowend的类型
  63. class TopNHotUrls(topSize: Int) extends KeyedProcessFunction[Long,UrlViewCount,String]{
  64. lazy val urlState:ListState[UrlViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]
  65. ("url-state",classOf[UrlViewCount]))
  66. override def processElement(i: UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]
  67. #Context, collector: Collector[String]): Unit = {
  68. urlState.add(i)
  69. context.timerService().registerEventTimeTimer(i.windowEnd + 1)
  70. }
  71. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]
  72. #OnTimerContext, out: Collector[String]): Unit = {
  73. //从状态中拿到数据
  74. val allUrlViews:ListBuffer[UrlViewCount] = new ListBuffer[UrlViewCount]
  75. val iter = urlState.get().iterator()
  76. while (iter.hasNext){
  77. allUrlViews += iter.next()
  78. }
  79. urlState.clear()
  80. //sortWith从高到底
  81. val sortedUrlViews = allUrlViews.sortWith(_.count>_.count).take(topSize)
  82. //格式化输出
  83. val result:StringBuilder = new StringBuilder()
  84. result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")
  85. for (i <- sortedUrlViews.indices){
  86. val currentUrlView = sortedUrlViews(i)
  87. result.append("NO").append(i+1).append(":")
  88. .append("URL=").append(currentUrlView.url)
  89. .append("访问量=").append(currentUrlView.count).append("\n")
  90. }
  91. result.append("=====================")
  92. Thread.sleep(1000)
  93. out.collect(result.toString())
  94. }
  95. }

在这里插入图片描述
但是这样写有bug,下面这篇文章进行了优化
https://blog.csdn.net/qq_46548855/article/details/109296967

发表评论

表情:
评论列表 (有 0 条评论,65人围观)

还没有评论,来说两句吧...

相关阅读