Flink-电商用户行为分析(实时对账)
链接:https://pan.baidu.com/s/1_DJmEPtNxsCiDnw8KNwmoA
提取码:exq9
对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。这里我们利用connect将两条流进行连接,然后用collect进行处理或者使用join。
接下来我将使用两种方法(1.collect,2.使用join操作)
collect代码实现
import org.apache.flink.api.common.state.{ ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object OrderPayTxMatch {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//从文件中读取数据,并转换成样例类
val resource1 = getClass.getResource("/OrderLog.csv")
val orderEventStream = env.readTextFile(resource1.getPath)
.map(data=>{
val dataArray = data.split(",")
OrderEvent(dataArray(0).toLong,dataArray(1),dataArray(2),dataArray(3).toLong)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L
})
.filter(_.txId != "")
.keyBy(_.txId)
val resource2 = getClass.getResource("/ReceiptLog.csv")
val receiptStream = env.readTextFile(resource2.getPath)
.map(data=>{
val dataArray = data.split(",")
ReceiptEvent(dataArray(0),dataArray(1),dataArray(2).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
.keyBy(_.txId)
//connect连接两条流,匹配事件进行处理
val resultStream = orderEventStream.connect(receiptStream)
.process(new OrderPayTxDetect())
//定义侧输出流
val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")
resultStream.print()
resultStream.getSideOutput(unmatchedPays).print("unmatched-pays")
resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts")
env.execute("order pay tx match job")
}
}
//定义CoProcessFunction,实现两条流数据的匹配检测
class OrderPayTxDetect() extends CoProcessFunction[OrderEvent,ReceiptEvent,(OrderEvent,ReceiptEvent)]{
//定义两个ValueState,保存当前交易对应的支付事件和到账事件
lazy val payState:ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay",
classOf[OrderEvent]))
lazy val receiptState:ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]
("receipt",classOf[ReceiptEvent]))
val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")
override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent,
ReceiptEvent)]
#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
//pay来了,考察有没有对应的receipt来过
val receipt = receiptState.value()
if (receipt != null){
//如果已经又receipt,正常输出到主流
out.collect((pay,receipt))
}else{
//如果receipt还没来,那么把pay存入庄涛,注册一个定时器等待5秒
payState.update(pay)
ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L + 5000L)
}
}
override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent,
ReceiptEvent)]
#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
//receipt来了,考察有没有对应的pay来过
val pay = payState.value()
if (pay != null){
//如果已经有pay,那么正常匹配,输出到主流
out.collect((pay,receipt))
}else{
//如果pay还没来,那么把receipt存入状态,注册一个定时器等待3秒
receiptState.update(receipt)
ctx.timerService().registerEventTimeTimer(receipt.timestamp * 1000L + 3000L)
}
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]
#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
//如果pay不为空,说明receipt没来,输出unmatchedPays
if(payState.value() != null)
ctx.output(unmatchedPays,payState.value())
if (receiptState.value() != null)
ctx.output(unmatchedReceipts,receiptState.value())
//情况状态
payState.clear()
receiptState.clear()
}
}
join代码实现
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
case class OrderEvent(orderId:Long,eventType:String,txId:String,eventTime:Long)
case class ReceiptEvent(txId:String,payChannel:String,timestamp:Long)
object OrderPayTxMatchWithJoin {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//从文件中读取数据,并转换成样例类
val resource1 = getClass.getResource("/OrderLog.csv")
val orderEventStream = env.readTextFile(resource1.getPath)
.map(data=>{
val dataArray = data.split(",")
OrderEvent(dataArray(0).toLong,dataArray(1),dataArray(2),dataArray(3).toLong)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L
})
.filter(_.txId != "")
.keyBy(_.txId)
val resource2 = getClass.getResource("/ReceiptLog.csv")
val receiptEventStream = env.readTextFile(resource2.getPath)
.map(data=>{
val dataArray = data.split(",")
ReceiptEvent(dataArray(0),dataArray(1),dataArray(2).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
.keyBy(_.txId)
//使用join连接两条流
val resultStream = orderEventStream
.intervalJoin(receiptEventStream)
.between(Time.seconds(-3), Time.seconds(5))
.process(new OrderPayTxDetectWithJoin())
resultStream.print()
env.execute("order pay tx match with join job")
}
}
//自定义ProcessJoinFunction
class OrderPayTxDetectWithJoin() extends ProcessJoinFunction[OrderEvent,ReceiptEvent,(OrderEvent,ReceiptEvent)]{
override def processElement(left: OrderEvent, right: ReceiptEvent, ctx: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]
#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
out.collect((left,right))
}
}
总结
虽然join很方便,简单,但是有局限性,只能匹配对应上的,不能输出没有匹配上的!
还没有评论,来说两句吧...