Flink-电商用户行为分析(订单支付实时监控-状态编程实现)_1
数据
链接:https://pan.baidu.com/s/1nSMh3JaNDW1SheQ5I4J0FQ
提取码:e49w
在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未付,订单就会被取消。需求
用户支付,订单15分钟内,查看支付成功或超时的
来一条处理一条,正常的放主流,不正常的放侧输出流,代码实现
import org.apache.flink.api.common.state.{ ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{ OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object OrderTimeOutWithoutCep {
val orderTimeOutputTag = new OutputTag[OrderResult]("orderTimeout")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
//读取订单数据
val orderEventStream = env.socketTextStream("hadoop102", 7777)
.map(data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.orderId)
//定义process function进行超时检测
val orderResultStream = orderEventStream
.process(new OrderPayMatch())
orderResultStream.print("payed")
orderResultStream.getSideOutput(orderTimeOutputTag).print("timeout")
env.execute("order timeout without cep job")
}
class OrderPayMatch() extends KeyedProcessFunction[Long, OrderEvent, OrderResult] {
//保存pay是否来过的状态
lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean](
"ispayed-state", classOf[Boolean]))
//保存定时器的时间戳为状态
lazy val timeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long](
"timer-state", classOf[Long]))
override def processElement(i: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = {
//先读取状态
val isPayed = isPayedState.value()
val timerTs = timeState.value()
//根据事件的类型进行分类判断,做不同的处理逻辑
if (i.eventType == "create") {
//1.如果是create事件,接下来判断pay是否来过
if ( isPayed ) {
//1.1如过已经pay过,匹配成功,输出主流,清空状态
collector.collect(OrderResult(i.orderId, "payed successfully"))
context.timerService().deleteEventTimeTimer(timerTs)
isPayedState.clear()
timeState.clear()
} else {
//如果没有pay过,注册定时器等待pay的到来
val ts = i.eventTime * 1000L + 15 * 60 * 1000L
context.timerService().registerEventTimeTimer(ts)
timeState.update(ts)
}
} else if (i.eventType == "pay") {
//2.如果是pay事件,那么判断是否create过,用timer表示
if (timerTs > 0) {
//2.1如果有定时器,说明已经有create来过
//继续判断,是否超过了timeout时间
if (timerTs > i.eventTime * 1000L) {
//2.1.1如果定时器时间还没到,那么输出成功匹配 因为是pay先出现 然而前面create加15s 所以这里不加
collector.collect(OrderResult(i.orderId, "payed successfully"))
} else {
//2.1.2如果当前pay的时间已经超时,那么输出到测输出流
context.output(orderTimeOutputTag, OrderResult(i.orderId, "payed but already timeout"))
}
//输出结束,清空状态
context.timerService().deleteEventTimeTimer(timerTs)
isPayedState.clear()
timeState.clear()
} else {
//2.2pay先到了,更新状态,注册定时器等待create 乱序情况
//定时器时间戳:等到水位到了watermark=value.event*1000L 时看create来没来,如果没来则create丢失,因为create应该在pay之前
isPayedState.update(true)
context.timerService().registerEventTimeTimer(i.eventTime * 1000L)
timeState.update(i.eventTime * 1000L)
}
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
//根据状态的值,判断哪个数据没来
if (isPayedState.value()) {
//如果为true 表示pay先到,没等到create
ctx.output(orderTimeOutputTag, OrderResult(ctx.getCurrentKey, "already payed but not found create log"))
} else {
//表示create到了,没等到pay
ctx.output(orderTimeOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
}
isPayedState.clear()
timeState.clear()
}
}
}
犯了一个低级错误,导致花了很长时间找错误
写成了
orderEventStream.getSideOutput(orderTimeOutputTag).print("timeout")
正确是
orderResultStream.getSideOutput(orderTimeOutputTag).print("timeout")
还没有评论,来说两句吧...