Flink-电商用户行为分析(基于服务器 log的热门页面浏览量统计 )
数据
链接:https://pan.baidu.com/s/1gT6GhMQckShlbhLCqb\_pWQ
提取码:558h
我们现在要实现的模块是 “实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从 web 服务器的日志中提取出来。
我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行 log,统计在一段时间内用户访问每一个 url 的次数,然后排序输出显示。
具体做法为:每隔 5 秒,输出最近 10 分钟内访问量最多的前 N 个 URL
需要注意的是,原始日志中的时间是“dd/MM/yyyymm:ss”的形式,需要定义一个 DateTimeFormat 将其转换为我们需要的时间戳格式:
val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime
完整代码
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
//204.62.56.3 - - 20/05/2015:17:05:11 +0000 GET /presentations/logstash-puppetconf-2012/images/nagios-sms2.png
//输入数据样例类
case class ApacheLogEvent(ip:String,userId:String,evetTime:Long,method:String,url:String)
//窗口聚合结果样例类
case class UrlViewCount(url:String,windowEnd:Long,count:Long)
object NetworkFlow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.readTextFile("D:\\idea\\flinkUser\\NetworkFlowAnalysis\\src\\main\\resources\\apache.log")
.map(data=>{
val dataArray = data.split(" ")
//定义事件转换 20/05/2015:17:05:11
val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime
ApacheLogEvent(dataArray(0).trim,dataArray(1).trim,timestamp,dataArray(5).trim,dataArray(6).trim)
})
//*1000看是秒还是毫秒 秒*1000 毫秒不乘
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) {
override def extractTimestamp(t: ApacheLogEvent): Long = t.evetTime
})
.keyBy(_.url)
.timeWindow(Time.minutes(10),Time.seconds(5))
//允许60秒的延迟数据去更新
.allowedLateness(Time.seconds(60))
.aggregate(new CountAgg(),new WindowResult())
.keyBy(_.windowEnd)
.process(new TopNHotUrls(5))
dataStream.print()
env.execute("network flow job")
}
}
//自定义预聚合函数
class CountAgg() extends AggregateFunction[ApacheLogEvent,Long,Long]{
override def createAccumulator(): Long = 0L
override def add(in: ApacheLogEvent, acc: Long): Long = acc+1
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
//自定义窗口函数
class WindowResult() extends WindowFunction[Long,UrlViewCount,String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long],
out: Collector[UrlViewCount]): Unit = {
out.collect(UrlViewCount(key,window.getEnd,input.iterator.next()))
}
}
//自定义的处理函数 第一个windowend的类型
class TopNHotUrls(topSize: Int) extends KeyedProcessFunction[Long,UrlViewCount,String]{
lazy val urlState:ListState[UrlViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]
("url-state",classOf[UrlViewCount]))
override def processElement(i: UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]
#Context, collector: Collector[String]): Unit = {
urlState.add(i)
context.timerService().registerEventTimeTimer(i.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]
#OnTimerContext, out: Collector[String]): Unit = {
//从状态中拿到数据
val allUrlViews:ListBuffer[UrlViewCount] = new ListBuffer[UrlViewCount]
val iter = urlState.get().iterator()
while (iter.hasNext){
allUrlViews += iter.next()
}
urlState.clear()
//sortWith从高到底
val sortedUrlViews = allUrlViews.sortWith(_.count>_.count).take(topSize)
//格式化输出
val result:StringBuilder = new StringBuilder()
result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")
for (i <- sortedUrlViews.indices){
val currentUrlView = sortedUrlViews(i)
result.append("NO").append(i+1).append(":")
.append("URL=").append(currentUrlView.url)
.append("访问量=").append(currentUrlView.count).append("\n")
}
result.append("=====================")
Thread.sleep(1000)
out.collect(result.toString())
}
}
但是这样写有bug,下面这篇文章进行了优化
https://blog.csdn.net/qq_46548855/article/details/109296967
还没有评论,来说两句吧...