Flink-电商用户行为分析(实时对账)

叁歲伎倆 2022-11-22 00:23 318阅读 0赞
  1. 链接:https://pan.baidu.com/s/1_DJmEPtNxsCiDnw8KNwmoA
  2. 提取码:exq9

对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。这里我们利用connect将两条流进行连接,然后用collect进行处理或者使用join。
接下来我将使用两种方法(1.collect,2.使用join操作)
collect代码实现

  1. import org.apache.flink.api.common.state.{ ValueState, ValueStateDescriptor}
  2. import org.apache.flink.streaming.api.TimeCharacteristic
  3. import org.apache.flink.streaming.api.functions.co.CoProcessFunction
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.api.windowing.time.Time
  7. import org.apache.flink.util.Collector
  8. object OrderPayTxMatch {
  9. def main(args: Array[String]): Unit = {
  10. val env = StreamExecutionEnvironment.getExecutionEnvironment
  11. env.setParallelism(1)
  12. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  13. //从文件中读取数据,并转换成样例类
  14. val resource1 = getClass.getResource("/OrderLog.csv")
  15. val orderEventStream = env.readTextFile(resource1.getPath)
  16. .map(data=>{
  17. val dataArray = data.split(",")
  18. OrderEvent(dataArray(0).toLong,dataArray(1),dataArray(2),dataArray(3).toLong)
  19. })
  20. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
  21. override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L
  22. })
  23. .filter(_.txId != "")
  24. .keyBy(_.txId)
  25. val resource2 = getClass.getResource("/ReceiptLog.csv")
  26. val receiptStream = env.readTextFile(resource2.getPath)
  27. .map(data=>{
  28. val dataArray = data.split(",")
  29. ReceiptEvent(dataArray(0),dataArray(1),dataArray(2).toLong)
  30. })
  31. .assignAscendingTimestamps(_.timestamp * 1000L)
  32. .keyBy(_.txId)
  33. //connect连接两条流,匹配事件进行处理
  34. val resultStream = orderEventStream.connect(receiptStream)
  35. .process(new OrderPayTxDetect())
  36. //定义侧输出流
  37. val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
  38. val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")
  39. resultStream.print()
  40. resultStream.getSideOutput(unmatchedPays).print("unmatched-pays")
  41. resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts")
  42. env.execute("order pay tx match job")
  43. }
  44. }
  45. //定义CoProcessFunction,实现两条流数据的匹配检测
  46. class OrderPayTxDetect() extends CoProcessFunction[OrderEvent,ReceiptEvent,(OrderEvent,ReceiptEvent)]{
  47. //定义两个ValueState,保存当前交易对应的支付事件和到账事件
  48. lazy val payState:ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay",
  49. classOf[OrderEvent]))
  50. lazy val receiptState:ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]
  51. ("receipt",classOf[ReceiptEvent]))
  52. val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
  53. val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")
  54. override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent,
  55. ReceiptEvent)]
  56. #Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
  57. //pay来了,考察有没有对应的receipt来过
  58. val receipt = receiptState.value()
  59. if (receipt != null){
  60. //如果已经又receipt,正常输出到主流
  61. out.collect((pay,receipt))
  62. }else{
  63. //如果receipt还没来,那么把pay存入庄涛,注册一个定时器等待5秒
  64. payState.update(pay)
  65. ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L + 5000L)
  66. }
  67. }
  68. override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent,
  69. ReceiptEvent)]
  70. #Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
  71. //receipt来了,考察有没有对应的pay来过
  72. val pay = payState.value()
  73. if (pay != null){
  74. //如果已经有pay,那么正常匹配,输出到主流
  75. out.collect((pay,receipt))
  76. }else{
  77. //如果pay还没来,那么把receipt存入状态,注册一个定时器等待3秒
  78. receiptState.update(receipt)
  79. ctx.timerService().registerEventTimeTimer(receipt.timestamp * 1000L + 3000L)
  80. }
  81. }
  82. override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]
  83. #OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
  84. //如果pay不为空,说明receipt没来,输出unmatchedPays
  85. if(payState.value() != null)
  86. ctx.output(unmatchedPays,payState.value())
  87. if (receiptState.value() != null)
  88. ctx.output(unmatchedReceipts,receiptState.value())
  89. //情况状态
  90. payState.clear()
  91. receiptState.clear()
  92. }
  93. }

在这里插入图片描述
join代码实现

  1. import org.apache.flink.streaming.api.TimeCharacteristic
  2. import org.apache.flink.streaming.api.functions.ProcessFunction
  3. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.api.windowing.time.Time
  7. import org.apache.flink.util.Collector
  8. case class OrderEvent(orderId:Long,eventType:String,txId:String,eventTime:Long)
  9. case class ReceiptEvent(txId:String,payChannel:String,timestamp:Long)
  10. object OrderPayTxMatchWithJoin {
  11. def main(args: Array[String]): Unit = {
  12. val env = StreamExecutionEnvironment.getExecutionEnvironment
  13. env.setParallelism(1)
  14. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  15. //从文件中读取数据,并转换成样例类
  16. val resource1 = getClass.getResource("/OrderLog.csv")
  17. val orderEventStream = env.readTextFile(resource1.getPath)
  18. .map(data=>{
  19. val dataArray = data.split(",")
  20. OrderEvent(dataArray(0).toLong,dataArray(1),dataArray(2),dataArray(3).toLong)
  21. })
  22. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
  23. override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L
  24. })
  25. .filter(_.txId != "")
  26. .keyBy(_.txId)
  27. val resource2 = getClass.getResource("/ReceiptLog.csv")
  28. val receiptEventStream = env.readTextFile(resource2.getPath)
  29. .map(data=>{
  30. val dataArray = data.split(",")
  31. ReceiptEvent(dataArray(0),dataArray(1),dataArray(2).toLong)
  32. })
  33. .assignAscendingTimestamps(_.timestamp * 1000L)
  34. .keyBy(_.txId)
  35. //使用join连接两条流
  36. val resultStream = orderEventStream
  37. .intervalJoin(receiptEventStream)
  38. .between(Time.seconds(-3), Time.seconds(5))
  39. .process(new OrderPayTxDetectWithJoin())
  40. resultStream.print()
  41. env.execute("order pay tx match with join job")
  42. }
  43. }
  44. //自定义ProcessJoinFunction
  45. class OrderPayTxDetectWithJoin() extends ProcessJoinFunction[OrderEvent,ReceiptEvent,(OrderEvent,ReceiptEvent)]{
  46. override def processElement(left: OrderEvent, right: ReceiptEvent, ctx: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]
  47. #Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
  48. out.collect((left,right))
  49. }
  50. }

在这里插入图片描述

总结

虽然join很方便,简单,但是有局限性,只能匹配对应上的,不能输出没有匹配上的!

发表评论

表情:
评论列表 (有 0 条评论,318人围观)

还没有评论,来说两句吧...

相关阅读