Flink-电商用户行为分析(页面广告点击量统计-黑名单过滤)_2
数据
链接:https://pan.baidu.com/s/1jX852lIDQ5Jm6nPRGe6pAA
提取码:km1w
对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。更加具体的应是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。需求:
接下来我们就进行页面广告按照省份划分的点击量的统计,然后开一小时的时间窗口,
滑动距离为 5 秒,统计窗口内的点击事件数量。我们对一段时间内(比如一天内)的用户点击行为进行约束,如果对同一个广告点击超过一定限额(比如 100 次),应该把该户加入黑名单并报警,此后其点击行为不应该再统计。
广告点击量统计
我们使用两种方式来实现预聚合函数process和aggregateprocess
import java.sql.Timestamp
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
//定义输入输出样例类
case class AdClickEvent(userId:Long,adId:Long,province:String,city:String,timestamp:Long)
case class AdCountByProvince(province:String,windowEnd:String,count:Long)
object AdAnalysisByProvince {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val resource = getClass.getResource("/AdClickLog.csv")
val adLogStream = env.readTextFile(resource.getPath)
.map(data=> {
val dataArray = data.split(",")
AdClickEvent(dataArray(0).toLong,dataArray(1).toLong,dataArray(2),dataArray(3),dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
//按照province分组开窗聚合统计
val adCountStream = adLogStream
.keyBy(_.province)
.timeWindow(Time.hours(1),Time.seconds(5))
.process(new AdCount())
adCountStream.print()
env.execute("ad analysis job")
}
}
class AdCount() extends ProcessWindowFunction[AdClickEvent,AdCountByProvince,String,TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[AdClickEvent],
out: Collector[AdCountByProvince]): Unit = {
val province = key
val windowEnd = new Timestamp(context.window.getEnd).toString
val count = elements.size
out.collect(AdCountByProvince(province,windowEnd,count))
}
}
aggregate
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
//定义输入输出样例类
case class AdClickEvent(userId:Long,adId:Long,province:String,city:String,timestamp:Long)
case class AdCountByProvince(province:String,windowEnd:String,count:Long)
object AdAnalysisByProvince {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val resource = getClass.getResource("/AdClickLog.csv")
val adLogStream = env.readTextFile(resource.getPath)
.map(data=> {
val dataArray = data.split(",")
AdClickEvent(dataArray(0).toLong,dataArray(1).toLong,dataArray(2),dataArray(3),dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
//按照province分组开窗聚合统计
val adCountStream = adLogStream
.keyBy(_.province)
.timeWindow(Time.hours(1),Time.seconds(5))
.aggregate(new AdCountAgg(),new AdCountResult())
adCountStream.print()
env.execute("ad analysis job")
}
}
//自定义预聚合函数,第一个In就是Map
class AdCountAgg() extends AggregateFunction[AdClickEvent,Long,Long]{
override def createAccumulator(): Long = 0L
override def add(value: AdClickEvent, accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
//自定义窗口函数,第一个参数就预聚合函数最后输出的值,Long
class AdCountResult() extends WindowFunction[Long,AdCountByProvince,String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit
= {
out.collect(AdCountByProvince(key,new Timestamp(window.getEnd).toString,input.head))
}
}
黑名单过滤
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
//定义输入输出样例类
case class AdClickEvent(userId:Long,adId:Long,province:String,city:String,timestamp:Long)
case class AdCountByProvince(province:String,windowEnd:String,count:Long)
//定义测输出流报警信息样例类
case class BlackListWarning(userId:Long,adId:Long,msg:String)
object AdAnalysisByProvinceBlack {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val resource = getClass.getResource("/AdClickLog.csv")
val adLogStream = env.readTextFile(resource.getPath)
.map(data=> {
val dataArray = data.split(",")
AdClickEvent(dataArray(0).toLong,dataArray(1).toLong,dataArray(2),dataArray(3),dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
//定义刷单行为过滤操作
val filterBlackListStream = adLogStream
.keyBy(data=> (data.userId,data.adId)) //按照用户和广告id进行集合
.process(new FilterBlackList(100L))
//按照province分组开窗聚合统计
val adCountStream = filterBlackListStream
.keyBy(_.province)
.timeWindow(Time.hours(1),Time.seconds(5))
.aggregate(new AdCountAgg(),new AdCountResult())
adCountStream.print()
filterBlackListStream.getSideOutput(new OutputTag[BlackListWarning]("blacklist")).print("blacklist")
env.execute("ad analysis job")
}
}
//实现自定义的ProcessFunction
class FilterBlackList(maxClickCount: Long) extends KeyedProcessFunction[(Long,Long),AdClickEvent,AdClickEvent]{
//定义状态,需要保存当前用户对当前广告的点击量count
lazy val countState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
//标识位,用来表示用户是否已经在黑名单中
lazy val isSendState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-sent",
classOf[Boolean]))
override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long,Long), AdClickEvent, AdClickEvent]
#Context, out: Collector[AdClickEvent]): Unit = {
//取出状态数据
val curCount = countState.value()
//如果是第一个数据,那么注册第二天0点的定时器,用于清空状态
if (curCount == 0){
val ts = (ctx.timerService().currentProcessingTime() / (1000*60*60*24) + 1) * (1000*60*60*24)
ctx.timerService().registerProcessingTimeTimer(ts)
}
//判断count值是否达到上线,如果达到,并且之前没有输出过报警信息,那么报警
if (curCount >= maxClickCount){
if (!isSendState.value()){
ctx.output(new OutputTag[BlackListWarning]("blacklist"),BlackListWarning(value.userId,value.adId,
"click over"+maxClickCount+"times today"))
isSendState.update(true)
}
return
}
//count值加1
countState.update(curCount + 1)
out.collect(value)
}
//0点触发定时器,直接清空状态
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]
#OnTimerContext, out: Collector[AdClickEvent]): Unit = {
countState.clear()
isSendState.clear()
}
}
//自定义预聚合函数,第一个In就是Map
class AdCountAgg() extends AggregateFunction[AdClickEvent,Long,Long]{
override def createAccumulator(): Long = 0L
override def add(value: AdClickEvent, accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
//自定义窗口函数,第一个参数就预聚合函数最后输出的值,Long
class AdCountResult() extends WindowFunction[Long,AdCountByProvince,String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit
= {
out.collect(AdCountByProvince(key,new Timestamp(window.getEnd).toString,input.head))
}
}
我犯了一个低级错误,导致黑名单输出不出来
把这个
val ts = (ctx.timerService().currentProcessingTime() / (1000*60*60*24) + 1) * (1000*60*60*24)
写成了
val ts = (ctx.timerService().currentProcessingTime() / ((1000*60*60*24) + 1) * (1000*60*60*24))
还没有评论,来说两句吧...