Flink-电商用户行为分析(kafka)
这篇文章是在https://blog.csdn.net/qq_46548855/article/details/107144990基础上改用kafka根据实际的需要,我们还可以将 Sink 指定为 Kafka、ES、Redis 或其它
实际生产环境中,我们的数据流往往是从 Kafka 获取到的。如果要让代码更贴近生产实际,我们只需将 source 更换为 Kafka 即可:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop102:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream = env
.addSource(new FlinkKafkaConsumer[String]("hotitems", new
SimpleStringSchema(), properties))
最终代码
import java.sql.Timestamp
import java.util.Properties
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
import scala.tools.cmd.{ Property, PropertyMapper, Reference, Spec}
//定义输入数据的样例类
case class UserBehavior(UserID:Long,itemId:Long,categoryId:Int,behavior:String,timestamp:Long)
// 商品点击量(窗口操作的输出类型)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object kafkahot {
def main(args: Array[String]): Unit = {
//创建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop102:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//2.读取数据 834377,4541270,3738615,pv,1511658000
val dataStream = env.addSource(new FlinkKafkaConsumer[String]("hotitems",new SimpleStringSchema(),properties))
.map(data=>{
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,
dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
//transform处理数据
val processedStream = dataStream
.filter(_.behavior == "pv")
.keyBy(_.itemId)
.timeWindow(Time.hours(1),Time.minutes(5))
.aggregate(new CountAgg,new WindowResult())
.keyBy(_.windowEnd) //按照窗口分组
.process(new TopNHot(3))
//sink 控制台输出
processedStream.print()
env.execute("hot items job")
}
}
//自定义预聚合函数
class CountAgg() extends AggregateFunction[UserBehavior,Long,Long]{
override def createAccumulator(): Long = 0L
override def add(in: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
//自定义预聚合函数计算平均数
class AverageAgg() extends AggregateFunction[UserBehavior,(Long,Int),Double]{
override def createAccumulator(): (Long, Int) = (0L,0)
override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1 + in.timestamp,acc._2 + 1)
override def getResult(acc: (Long, Int)): Double = acc._1/acc._2
override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1 ,acc._2+acc1._2)
}
//自定义窗口函数,输出ItemViewCount 第一个Long预聚合最终输出的long 第三个long是ItemId
class WindowResult() extends WindowFunction[Long,ItemViewCount,Long,TimeWindow]{
override def apply(key: Long, window: TimeWindow, input: Iterable[Long],
out: Collector[ItemViewCount]): Unit = {
out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))
}
}
//自定义的处理函数 第一个是windowEnd所以是long
class TopNHot(topSize:Int) extends KeyedProcessFunction[Long,ItemViewCount,String]{
private var itemSate:ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
itemSate = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state",classOf[ItemViewCount]))
}
//定时器触发时,对所有数据排序,并输出结果
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext,
out: Collector[String]): Unit = {
//将所有state中的数据取出,放入到一个list buffer中
val allItems:ListBuffer[ItemViewCount] = new ListBuffer()
import scala.collection.JavaConversions._
for (item <- itemSate.get()){
allItems += item
}
//按照Count大小排序 并取前N个
val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
//清空状态 如果不想要下面的格式可以out.collect(sortedItems.toString())输出
itemSate.clear()
//将排名结果格式化输出 -1是之前注册定时器+1
val result:StringBuilder = new StringBuilder()
result.append("时间:").append(new Timestamp(timestamp-1)).append("\n")
//输出每一个商品信息
for (i <- sortedItems.indices){
val currentItem = sortedItems(i)
result.append("No").append(i+1).append(":")
.append("商品ID =").append(currentItem.itemId)
.append("浏览量").append(currentItem.count)
.append("\n")
}
result.append("=====================")
//控制输出频率
Thread.sleep(1000)
out.collect(result.toString())
}
override def processElement(i: ItemViewCount, context: KeyedProcessFunction
[Long, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
//把每条数据存入状态列表
itemSate.add(i)
//注册一个定时器
context.timerService().registerEventTimeTimer(i.windowEnd + 1)
}
}
创建生产者(模拟数据)
import java.util.Properties
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord}
object KafkaProducerUtil {
def main(args: Array[String]): Unit = {
writeToKafkaWithTopic("hotitems")
}
def writeToKafkaWithTopic(topic: String):Unit={
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop12:9092")
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//创建一个KafkaProducer,用它来发送数据
val producer = new KafkaProducer[String, String](properties)
//从文件中读取数据,逐条发送
val bufferedSource = io.Source.fromFile("D:\\idea\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
for (line <- bufferedSource.getLines()){
val record = new ProducerRecord[String, String](topic,line)
producer.send(record)
}
producer.close()
}
}
先启动消费者,再启动生产者
还没有评论,来说两句吧...