Flink-电商用户行为分析(kafka)

逃离我推掉我的手 2023-02-23 08:55 76阅读 0赞

这篇文章是在https://blog.csdn.net/qq_46548855/article/details/107144990基础上改用kafka
根据实际的需要,我们还可以将 Sink 指定为 Kafka、ES、Redis 或其它

实际生产环境中,我们的数据流往往是从 Kafka 获取到的。如果要让代码更贴近生产实际,我们只需将 source 更换为 Kafka 即可:

  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "hadoop102:9092")
  3. properties.setProperty("group.id", "consumer-group")
  4. properties.setProperty("key.deserializer",
  5. "org.apache.kafka.common.serialization.StringDeserializer")
  6. properties.setProperty("value.deserializer",
  7. "org.apache.kafka.common.serialization.StringDeserializer")
  8. properties.setProperty("auto.offset.reset", "latest")
  9. val stream = env
  10. .addSource(new FlinkKafkaConsumer[String]("hotitems", new
  11. SimpleStringSchema(), properties))

最终代码

  1. import java.sql.Timestamp
  2. import java.util.Properties
  3. import org.apache.flink.api.common.functions.AggregateFunction
  4. import org.apache.flink.api.common.serialization.SimpleStringSchema
  5. import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor}
  6. import org.apache.flink.api.scala._
  7. import org.apache.flink.configuration.Configuration
  8. import org.apache.flink.streaming.api.TimeCharacteristic
  9. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  10. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  11. import org.apache.flink.streaming.api.scala.function.WindowFunction
  12. import org.apache.flink.streaming.api.windowing.time.Time
  13. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  14. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  15. import org.apache.flink.util.Collector
  16. import scala.collection.mutable.ListBuffer
  17. import scala.tools.cmd.{ Property, PropertyMapper, Reference, Spec}
  18. //定义输入数据的样例类
  19. case class UserBehavior(UserID:Long,itemId:Long,categoryId:Int,behavior:String,timestamp:Long)
  20. // 商品点击量(窗口操作的输出类型)
  21. case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
  22. object kafkahot {
  23. def main(args: Array[String]): Unit = {
  24. //创建环境
  25. val env = StreamExecutionEnvironment.getExecutionEnvironment
  26. env.setParallelism(1)
  27. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  28. //读取数据
  29. val properties = new Properties()
  30. properties.setProperty("bootstrap.servers", "hadoop102:9092")
  31. properties.setProperty("group.id", "consumer-group")
  32. properties.setProperty("key.deserializer",
  33. "org.apache.kafka.common.serialization.StringDeserializer")
  34. properties.setProperty("value.deserializer",
  35. "org.apache.kafka.common.serialization.StringDeserializer")
  36. properties.setProperty("auto.offset.reset", "latest")
  37. //2.读取数据 834377,4541270,3738615,pv,1511658000
  38. val dataStream = env.addSource(new FlinkKafkaConsumer[String]("hotitems",new SimpleStringSchema(),properties))
  39. .map(data=>{
  40. val dataArray = data.split(",")
  41. UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,
  42. dataArray(4).trim.toLong)
  43. })
  44. .assignAscendingTimestamps(_.timestamp * 1000L)
  45. //transform处理数据
  46. val processedStream = dataStream
  47. .filter(_.behavior == "pv")
  48. .keyBy(_.itemId)
  49. .timeWindow(Time.hours(1),Time.minutes(5))
  50. .aggregate(new CountAgg,new WindowResult())
  51. .keyBy(_.windowEnd) //按照窗口分组
  52. .process(new TopNHot(3))
  53. //sink 控制台输出
  54. processedStream.print()
  55. env.execute("hot items job")
  56. }
  57. }
  58. //自定义预聚合函数
  59. class CountAgg() extends AggregateFunction[UserBehavior,Long,Long]{
  60. override def createAccumulator(): Long = 0L
  61. override def add(in: UserBehavior, acc: Long): Long = acc + 1
  62. override def getResult(acc: Long): Long = acc
  63. override def merge(acc: Long, acc1: Long): Long = acc + acc1
  64. }
  65. //自定义预聚合函数计算平均数
  66. class AverageAgg() extends AggregateFunction[UserBehavior,(Long,Int),Double]{
  67. override def createAccumulator(): (Long, Int) = (0L,0)
  68. override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1 + in.timestamp,acc._2 + 1)
  69. override def getResult(acc: (Long, Int)): Double = acc._1/acc._2
  70. override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1 ,acc._2+acc1._2)
  71. }
  72. //自定义窗口函数,输出ItemViewCount 第一个Long预聚合最终输出的long 第三个long是ItemId
  73. class WindowResult() extends WindowFunction[Long,ItemViewCount,Long,TimeWindow]{
  74. override def apply(key: Long, window: TimeWindow, input: Iterable[Long],
  75. out: Collector[ItemViewCount]): Unit = {
  76. out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))
  77. }
  78. }
  79. //自定义的处理函数 第一个是windowEnd所以是long
  80. class TopNHot(topSize:Int) extends KeyedProcessFunction[Long,ItemViewCount,String]{
  81. private var itemSate:ListState[ItemViewCount] = _
  82. override def open(parameters: Configuration): Unit = {
  83. itemSate = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state",classOf[ItemViewCount]))
  84. }
  85. //定时器触发时,对所有数据排序,并输出结果
  86. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext,
  87. out: Collector[String]): Unit = {
  88. //将所有state中的数据取出,放入到一个list buffer中
  89. val allItems:ListBuffer[ItemViewCount] = new ListBuffer()
  90. import scala.collection.JavaConversions._
  91. for (item <- itemSate.get()){
  92. allItems += item
  93. }
  94. //按照Count大小排序 并取前N个
  95. val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
  96. //清空状态 如果不想要下面的格式可以out.collect(sortedItems.toString())输出
  97. itemSate.clear()
  98. //将排名结果格式化输出 -1是之前注册定时器+1
  99. val result:StringBuilder = new StringBuilder()
  100. result.append("时间:").append(new Timestamp(timestamp-1)).append("\n")
  101. //输出每一个商品信息
  102. for (i <- sortedItems.indices){
  103. val currentItem = sortedItems(i)
  104. result.append("No").append(i+1).append(":")
  105. .append("商品ID =").append(currentItem.itemId)
  106. .append("浏览量").append(currentItem.count)
  107. .append("\n")
  108. }
  109. result.append("=====================")
  110. //控制输出频率
  111. Thread.sleep(1000)
  112. out.collect(result.toString())
  113. }
  114. override def processElement(i: ItemViewCount, context: KeyedProcessFunction
  115. [Long, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
  116. //把每条数据存入状态列表
  117. itemSate.add(i)
  118. //注册一个定时器
  119. context.timerService().registerEventTimeTimer(i.windowEnd + 1)
  120. }
  121. }

创建生产者(模拟数据)

  1. import java.util.Properties
  2. import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord}
  3. object KafkaProducerUtil {
  4. def main(args: Array[String]): Unit = {
  5. writeToKafkaWithTopic("hotitems")
  6. }
  7. def writeToKafkaWithTopic(topic: String):Unit={
  8. val properties = new Properties()
  9. properties.setProperty("bootstrap.servers", "hadoop12:9092")
  10. properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  11. properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  12. //创建一个KafkaProducer,用它来发送数据
  13. val producer = new KafkaProducer[String, String](properties)
  14. //从文件中读取数据,逐条发送
  15. val bufferedSource = io.Source.fromFile("D:\\idea\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
  16. for (line <- bufferedSource.getLines()){
  17. val record = new ProducerRecord[String, String](topic,line)
  18. producer.send(record)
  19. }
  20. producer.close()
  21. }
  22. }

先启动消费者,再启动生产者

发表评论

表情:
评论列表 (有 0 条评论,76人围观)

还没有评论,来说两句吧...

相关阅读

    相关 案例---用户业务行为分析

    1. 电商用户业务行为分析 电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用数据分析技术进行深入挖掘和分析,得到感兴趣的商