flink interval join代码示例 以你之姓@ 2022-09-06 10:19 160阅读 0赞 package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.table.planner.runtime.utils._ import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized import scala.collection.mutable @RunWith(classOf[Parameterized]) class WppIntervalJoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { /** Tests for left outer join * todo wpp 测试left join * */ @Test def testProcTimeLeftOuterJoin(): Unit = { env.setParallelism(1) // val sqlQuery = // """ // |SELECT t2.a , t2.c, t1.c t1c,t1.b t1b // |FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON // | t1.a = t2.a AND // | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND // | t2.proctime + INTERVAL '3' SECOND // """.stripMargin val sqlQuery = """ |SELECT t1.a,t1.b,t1.c,'----' as test, t2.b,t2.c |FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON | t1.a = t2.a AND | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND | t2.proctime + INTERVAL '3' SECOND """.stripMargin val data1 = new mutable.MutableList[(Int, Long, String)] data1.+=((1, 1L, "Hi1")) data1.+=((1, 2L, "Hi2")) data1.+=((1, 5L, "Hi3")) data1.+=((2, 7L, "Hi5")) val data2 = new mutable.MutableList[(Int, Long, String)] data2.+=((1, 1L, "HiHi")) data2.+=((2, 2L, "HeHe")) data2.+=((2, 3L, "HeHe3")) data2.+=((2, 10L, "HeHe3")) val t1 = env.fromCollection(data1) .toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) val t2 = env.fromCollection(data2) .toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) tEnv.registerTable("T1", t1) tEnv.registerTable("T2", t2) val sink = new TestingAppendSink val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] // result.addSink(sink) println("====") result.print() env.execute() } /** Tests for left outer join * todo wpp 测试left join * */ @Test def testProcTimeLeftOuterJoin2(): Unit = { env.setParallelism(1) val sqlQuery = """ |SELECT t1.a,t1.rowtime,t1.c,'----' as test,t2.c |FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON | t1.a = t2.a AND | t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND | t2.rowtime + INTERVAL '3' SECOND """.stripMargin val data1 = new mutable.MutableList[(Int, Long, String)] data1.+=((1, 1629369101000L, "Hi1")) data1.+=((1, 1629369102000L, "Hi2")) data1.+=((1, 1629369105000L, "Hi3")) data1.+=((2, 1629369107000L, "Hi5")) val data2 = new mutable.MutableList[(Int, Long, String)] data2.+=((1, 1629369101000L, "HiHi")) data2.+=((1, 1629369102000L, "HeHe")) // 为03的时候产生一条关联 // data2.+=((2, 1629369103000L, "2HeHe3")) // 为08的时候产生两条关联 data2.+=((2, 1629369108000L, "2HeHe3")) data2.+=((2, 1629369109000L, "2HeHe33")) val t1 = env.fromCollection(data1) .assignTimestampsAndWatermarks(new Row3WatermarkExtractor31) // .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) .toTable(tEnv, 'a, 'rowtime.rowtime, 'c) val t2 = env.fromCollection(data2) .assignTimestampsAndWatermarks(new Row3WatermarkExtractor31) .toTable(tEnv, 'a, 'rowtime.rowtime, 'c) tEnv.registerTable("T1", t1) tEnv.registerTable("T2", t2) val sink = new TestingAppendSink val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] // result.addSink(sink) println("====") result.print() env.execute() } } private class Row3WatermarkExtractor31 extends AssignerWithPunctuatedWatermarks[(Int, Long, String)] { override def checkAndGetNextWatermark(lastElement: (Int, Long, String), extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp - 1) } override def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = { element._2 } }
还没有评论,来说两句吧...