Flink-电商用户行为分析(页面广告点击量统计-黑名单过滤)_2

雨点打透心脏的1/2处 2022-11-21 04:10 210阅读 0赞

数据
链接:https://pan.baidu.com/s/1jX852lIDQ5Jm6nPRGe6pAA
提取码:km1w
对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。更加具体的应是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。
需求:
接下来我们就进行页面广告按照省份划分的点击量的统计,然后开一小时的时间窗口,
滑动距离为 5 秒,统计窗口内的点击事件数量。我们对一段时间内(比如一天内)的用户点击行为进行约束,如果对同一个广告点击超过一定限额(比如 100 次),应该把该户加入黑名单并报警,此后其点击行为不应该再统计。

广告点击量统计

我们使用两种方式来实现预聚合函数process和aggregate
process

  1. import java.sql.Timestamp
  2. import org.apache.flink.streaming.api.TimeCharacteristic
  3. import org.apache.flink.streaming.api.scala._
  4. import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
  5. import org.apache.flink.streaming.api.windowing.time.Time
  6. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  7. import org.apache.flink.util.Collector
  8. //定义输入输出样例类
  9. case class AdClickEvent(userId:Long,adId:Long,province:String,city:String,timestamp:Long)
  10. case class AdCountByProvince(province:String,windowEnd:String,count:Long)
  11. object AdAnalysisByProvince {
  12. def main(args: Array[String]): Unit = {
  13. val env = StreamExecutionEnvironment.getExecutionEnvironment
  14. env.setParallelism(1)
  15. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  16. val resource = getClass.getResource("/AdClickLog.csv")
  17. val adLogStream = env.readTextFile(resource.getPath)
  18. .map(data=> {
  19. val dataArray = data.split(",")
  20. AdClickEvent(dataArray(0).toLong,dataArray(1).toLong,dataArray(2),dataArray(3),dataArray(4).toLong)
  21. })
  22. .assignAscendingTimestamps(_.timestamp * 1000L)
  23. //按照province分组开窗聚合统计
  24. val adCountStream = adLogStream
  25. .keyBy(_.province)
  26. .timeWindow(Time.hours(1),Time.seconds(5))
  27. .process(new AdCount())
  28. adCountStream.print()
  29. env.execute("ad analysis job")
  30. }
  31. }
  32. class AdCount() extends ProcessWindowFunction[AdClickEvent,AdCountByProvince,String,TimeWindow]{
  33. override def process(key: String, context: Context, elements: Iterable[AdClickEvent],
  34. out: Collector[AdCountByProvince]): Unit = {
  35. val province = key
  36. val windowEnd = new Timestamp(context.window.getEnd).toString
  37. val count = elements.size
  38. out.collect(AdCountByProvince(province,windowEnd,count))
  39. }
  40. }

aggregate

  1. import java.sql.Timestamp
  2. import org.apache.flink.api.common.functions.AggregateFunction
  3. import org.apache.flink.streaming.api.TimeCharacteristic
  4. import org.apache.flink.streaming.api.scala._
  5. import org.apache.flink.streaming.api.scala.function.{ ProcessWindowFunction, WindowFunction}
  6. import org.apache.flink.streaming.api.windowing.time.Time
  7. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  8. import org.apache.flink.util.Collector
  9. //定义输入输出样例类
  10. case class AdClickEvent(userId:Long,adId:Long,province:String,city:String,timestamp:Long)
  11. case class AdCountByProvince(province:String,windowEnd:String,count:Long)
  12. object AdAnalysisByProvince {
  13. def main(args: Array[String]): Unit = {
  14. val env = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  17. val resource = getClass.getResource("/AdClickLog.csv")
  18. val adLogStream = env.readTextFile(resource.getPath)
  19. .map(data=> {
  20. val dataArray = data.split(",")
  21. AdClickEvent(dataArray(0).toLong,dataArray(1).toLong,dataArray(2),dataArray(3),dataArray(4).toLong)
  22. })
  23. .assignAscendingTimestamps(_.timestamp * 1000L)
  24. //按照province分组开窗聚合统计
  25. val adCountStream = adLogStream
  26. .keyBy(_.province)
  27. .timeWindow(Time.hours(1),Time.seconds(5))
  28. .aggregate(new AdCountAgg(),new AdCountResult())
  29. adCountStream.print()
  30. env.execute("ad analysis job")
  31. }
  32. }
  33. //自定义预聚合函数,第一个In就是Map
  34. class AdCountAgg() extends AggregateFunction[AdClickEvent,Long,Long]{
  35. override def createAccumulator(): Long = 0L
  36. override def add(value: AdClickEvent, accumulator: Long): Long = accumulator + 1
  37. override def getResult(accumulator: Long): Long = accumulator
  38. override def merge(a: Long, b: Long): Long = a + b
  39. }
  40. //自定义窗口函数,第一个参数就预聚合函数最后输出的值,Long
  41. class AdCountResult() extends WindowFunction[Long,AdCountByProvince,String,TimeWindow]{
  42. override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit
  43. = {
  44. out.collect(AdCountByProvince(key,new Timestamp(window.getEnd).toString,input.head))
  45. }
  46. }

在这里插入图片描述

黑名单过滤

  1. import java.sql.Timestamp
  2. import org.apache.flink.api.common.functions.AggregateFunction
  3. import org.apache.flink.api.common.state.{ ValueState, ValueStateDescriptor}
  4. import org.apache.flink.streaming.api.TimeCharacteristic
  5. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  6. import org.apache.flink.streaming.api.scala._
  7. import org.apache.flink.streaming.api.scala.function.{ ProcessWindowFunction, WindowFunction}
  8. import org.apache.flink.streaming.api.windowing.time.Time
  9. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  10. import org.apache.flink.util.Collector
  11. //定义输入输出样例类
  12. case class AdClickEvent(userId:Long,adId:Long,province:String,city:String,timestamp:Long)
  13. case class AdCountByProvince(province:String,windowEnd:String,count:Long)
  14. //定义测输出流报警信息样例类
  15. case class BlackListWarning(userId:Long,adId:Long,msg:String)
  16. object AdAnalysisByProvinceBlack {
  17. def main(args: Array[String]): Unit = {
  18. val env = StreamExecutionEnvironment.getExecutionEnvironment
  19. env.setParallelism(1)
  20. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  21. val resource = getClass.getResource("/AdClickLog.csv")
  22. val adLogStream = env.readTextFile(resource.getPath)
  23. .map(data=> {
  24. val dataArray = data.split(",")
  25. AdClickEvent(dataArray(0).toLong,dataArray(1).toLong,dataArray(2),dataArray(3),dataArray(4).toLong)
  26. })
  27. .assignAscendingTimestamps(_.timestamp * 1000L)
  28. //定义刷单行为过滤操作
  29. val filterBlackListStream = adLogStream
  30. .keyBy(data=> (data.userId,data.adId)) //按照用户和广告id进行集合
  31. .process(new FilterBlackList(100L))
  32. //按照province分组开窗聚合统计
  33. val adCountStream = filterBlackListStream
  34. .keyBy(_.province)
  35. .timeWindow(Time.hours(1),Time.seconds(5))
  36. .aggregate(new AdCountAgg(),new AdCountResult())
  37. adCountStream.print()
  38. filterBlackListStream.getSideOutput(new OutputTag[BlackListWarning]("blacklist")).print("blacklist")
  39. env.execute("ad analysis job")
  40. }
  41. }
  42. //实现自定义的ProcessFunction
  43. class FilterBlackList(maxClickCount: Long) extends KeyedProcessFunction[(Long,Long),AdClickEvent,AdClickEvent]{
  44. //定义状态,需要保存当前用户对当前广告的点击量count
  45. lazy val countState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
  46. //标识位,用来表示用户是否已经在黑名单中
  47. lazy val isSendState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-sent",
  48. classOf[Boolean]))
  49. override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long,Long), AdClickEvent, AdClickEvent]
  50. #Context, out: Collector[AdClickEvent]): Unit = {
  51. //取出状态数据
  52. val curCount = countState.value()
  53. //如果是第一个数据,那么注册第二天0点的定时器,用于清空状态
  54. if (curCount == 0){
  55. val ts = (ctx.timerService().currentProcessingTime() / (1000*60*60*24) + 1) * (1000*60*60*24)
  56. ctx.timerService().registerProcessingTimeTimer(ts)
  57. }
  58. //判断count值是否达到上线,如果达到,并且之前没有输出过报警信息,那么报警
  59. if (curCount >= maxClickCount){
  60. if (!isSendState.value()){
  61. ctx.output(new OutputTag[BlackListWarning]("blacklist"),BlackListWarning(value.userId,value.adId,
  62. "click over"+maxClickCount+"times today"))
  63. isSendState.update(true)
  64. }
  65. return
  66. }
  67. //count值加1
  68. countState.update(curCount + 1)
  69. out.collect(value)
  70. }
  71. //0点触发定时器,直接清空状态
  72. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]
  73. #OnTimerContext, out: Collector[AdClickEvent]): Unit = {
  74. countState.clear()
  75. isSendState.clear()
  76. }
  77. }
  78. //自定义预聚合函数,第一个In就是Map
  79. class AdCountAgg() extends AggregateFunction[AdClickEvent,Long,Long]{
  80. override def createAccumulator(): Long = 0L
  81. override def add(value: AdClickEvent, accumulator: Long): Long = accumulator + 1
  82. override def getResult(accumulator: Long): Long = accumulator
  83. override def merge(a: Long, b: Long): Long = a + b
  84. }
  85. //自定义窗口函数,第一个参数就预聚合函数最后输出的值,Long
  86. class AdCountResult() extends WindowFunction[Long,AdCountByProvince,String,TimeWindow]{
  87. override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit
  88. = {
  89. out.collect(AdCountByProvince(key,new Timestamp(window.getEnd).toString,input.head))
  90. }
  91. }

在这里插入图片描述
我犯了一个低级错误,导致黑名单输出不出来
把这个

  1. val ts = (ctx.timerService().currentProcessingTime() / (1000*60*60*24) + 1) * (1000*60*60*24)

写成了

  1. val ts = (ctx.timerService().currentProcessingTime() / ((1000*60*60*24) + 1) * (1000*60*60*24))

发表评论

表情:
评论列表 (有 0 条评论,210人围观)

还没有评论,来说两句吧...

相关阅读