Flink-电商用户行为分析(订单支付实时监控-状态编程实现)_1

骑猪看日落 2023-02-25 07:17 82阅读 0赞

数据
链接:https://pan.baidu.com/s/1nSMh3JaNDW1SheQ5I4J0FQ
提取码:e49w
在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未付,订单就会被取消。
需求
用户支付,订单15分钟内,查看支付成功或超时的
来一条处理一条,正常的放主流,不正常的放侧输出流,
代码实现

  1. import org.apache.flink.api.common.state.{ ValueState, ValueStateDescriptor}
  2. import org.apache.flink.streaming.api.TimeCharacteristic
  3. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  4. import org.apache.flink.streaming.api.scala.{ OutputTag, StreamExecutionEnvironment}
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.util.Collector
  7. object OrderTimeOutWithoutCep {
  8. val orderTimeOutputTag = new OutputTag[OrderResult]("orderTimeout")
  9. def main(args: Array[String]): Unit = {
  10. val env = StreamExecutionEnvironment.getExecutionEnvironment
  11. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  12. env.setParallelism(1)
  13. //读取订单数据
  14. val orderEventStream = env.socketTextStream("hadoop102", 7777)
  15. .map(data => {
  16. val dataArray = data.split(",")
  17. OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
  18. })
  19. .assignAscendingTimestamps(_.eventTime * 1000L)
  20. .keyBy(_.orderId)
  21. //定义process function进行超时检测
  22. val orderResultStream = orderEventStream
  23. .process(new OrderPayMatch())
  24. orderResultStream.print("payed")
  25. orderResultStream.getSideOutput(orderTimeOutputTag).print("timeout")
  26. env.execute("order timeout without cep job")
  27. }
  28. class OrderPayMatch() extends KeyedProcessFunction[Long, OrderEvent, OrderResult] {
  29. //保存pay是否来过的状态
  30. lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean](
  31. "ispayed-state", classOf[Boolean]))
  32. //保存定时器的时间戳为状态
  33. lazy val timeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long](
  34. "timer-state", classOf[Long]))
  35. override def processElement(i: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = {
  36. //先读取状态
  37. val isPayed = isPayedState.value()
  38. val timerTs = timeState.value()
  39. //根据事件的类型进行分类判断,做不同的处理逻辑
  40. if (i.eventType == "create") {
  41. //1.如果是create事件,接下来判断pay是否来过
  42. if ( isPayed ) {
  43. //1.1如过已经pay过,匹配成功,输出主流,清空状态
  44. collector.collect(OrderResult(i.orderId, "payed successfully"))
  45. context.timerService().deleteEventTimeTimer(timerTs)
  46. isPayedState.clear()
  47. timeState.clear()
  48. } else {
  49. //如果没有pay过,注册定时器等待pay的到来
  50. val ts = i.eventTime * 1000L + 15 * 60 * 1000L
  51. context.timerService().registerEventTimeTimer(ts)
  52. timeState.update(ts)
  53. }
  54. } else if (i.eventType == "pay") {
  55. //2.如果是pay事件,那么判断是否create过,用timer表示
  56. if (timerTs > 0) {
  57. //2.1如果有定时器,说明已经有create来过
  58. //继续判断,是否超过了timeout时间
  59. if (timerTs > i.eventTime * 1000L) {
  60. //2.1.1如果定时器时间还没到,那么输出成功匹配 因为是pay先出现 然而前面create加15s 所以这里不加
  61. collector.collect(OrderResult(i.orderId, "payed successfully"))
  62. } else {
  63. //2.1.2如果当前pay的时间已经超时,那么输出到测输出流
  64. context.output(orderTimeOutputTag, OrderResult(i.orderId, "payed but already timeout"))
  65. }
  66. //输出结束,清空状态
  67. context.timerService().deleteEventTimeTimer(timerTs)
  68. isPayedState.clear()
  69. timeState.clear()
  70. } else {
  71. //2.2pay先到了,更新状态,注册定时器等待create 乱序情况
  72. //定时器时间戳:等到水位到了watermark=value.event*1000L 时看create来没来,如果没来则create丢失,因为create应该在pay之前
  73. isPayedState.update(true)
  74. context.timerService().registerEventTimeTimer(i.eventTime * 1000L)
  75. timeState.update(i.eventTime * 1000L)
  76. }
  77. }
  78. }
  79. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
  80. //根据状态的值,判断哪个数据没来
  81. if (isPayedState.value()) {
  82. //如果为true 表示pay先到,没等到create
  83. ctx.output(orderTimeOutputTag, OrderResult(ctx.getCurrentKey, "already payed but not found create log"))
  84. } else {
  85. //表示create到了,没等到pay
  86. ctx.output(orderTimeOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
  87. }
  88. isPayedState.clear()
  89. timeState.clear()
  90. }
  91. }
  92. }

在这里插入图片描述
在这里插入图片描述
犯了一个低级错误,导致花了很长时间找错误
写成了

  1. orderEventStream.getSideOutput(orderTimeOutputTag).print("timeout")

正确是

  1. orderResultStream.getSideOutput(orderTimeOutputTag).print("timeout")

发表评论

表情:
评论列表 (有 0 条评论,82人围观)

还没有评论,来说两句吧...

相关阅读