Flink Table 基于Processing Time、Event Time的多种Window实现 不念不忘少年蓝@ 2021-12-10 06:17 166阅读 0赞 ## Flink Table 基于Processing Time、Event Time的多种Window实现 ## Flink 提供了Table Api,用来统一批流入口,使用Flink Table Api,直接处理流,会特别的简洁,易用。下面来看下使用Flink Table Api实现 Tumble Window(翻滚窗口)、Slide Window(滑动窗口)、Session Window(会话)。 #### 一、基于Processing Time的Window实现示例代码 #### public class SqlTumbleWindowProcessTimeStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // ProcessingTime env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Properties p = new Properties(); p.setProperty("bootstrap.servers", "localhost:9092"); p.setProperty("group.id", "test"); Kafka kafka = new Kafka().properties(p).topic("user").version("0.10"); Schema schema = new Schema() .field("userId", Types.STRING) .field("name", Types.STRING) .field("age", Types.STRING) .field("sex", Types.STRING) .field("createTime", Types.BIG_DEC) .field("updateTime", Types.BIG_DEC) // 设置 Processing Time .field("procTime", Types.SQL_TIMESTAMP).proctime(); tableEnv.connect(kafka) .withSchema(schema) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSource("Users"); Table table = tableEnv.sqlQuery("select * from Users"); tableEnv.toAppendStream(table, TypeInformation.of(Row.class)).print("row:"); // tumble window Table table1 = tableEnv.sqlQuery("SELECT userId, TUMBLE_START(procTime, INTERVAL '1' MINUTE) AS rStart, COUNT(1) AS countNum from Users GROUP BY TUMBLE(procTime, INTERVAL '1' MINUTE),userId"); tableEnv.toAppendStream(table1, TypeInformation.of(Row.class)).print("tumble:"); // tumble window,这里试下TUMBLE_END、TUMBLE_PROCTIME api Table table2 = tableEnv.sqlQuery("SELECT userId,TUMBLE_START(procTime, INTERVAL '1' MINUTE) AS rStart, TUMBLE_END(procTime, INTERVAL '1' MINUTE) AS rEnd, TUMBLE_PROCTIME(procTime,INTERVAL '1' MINUTE) AS pTime,COUNT(1) AS countNum from Users GROUP BY TUMBLE(procTime, INTERVAL '1' MINUTE),userId"); tableEnv.toAppendStream(table2, TypeInformation.of(Row.class)).print("tumble2:"); // HOP()表示slide window Table table3 = tableEnv.sqlQuery("SELECT userId,HOP_START(procTime, INTERVAL '1' MINUTE,INTERVAL '5' MINUTE) as hStart,COUNT(1) AS countNum from Users GROUP BY HOP(procTime,INTERVAL '1' MINUTE,INTERVAL '5' MINUTE),userId"); tableEnv.toAppendStream(table3, TypeInformation.of(Row.class)).print("hop:"); // session window Table table4 = tableEnv.sqlQuery("SELECT userId,SESSION_START(procTime,INTERVAL '1' MINUTE) AS sStart,COUNT(1) AS countNum from Users GROUP BY SESSION(procTime,INTERVAL '1' MINUTE),userId"); tableEnv.toAppendStream(table4, TypeInformation.of(Row.class)).print("session:"); env.execute("SqlTumbleWindowProcessTimeStream"); } } #### 二、基于Event Time的Window实现示例代码 #### public class SqlTumbleWindowEventTimeStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Properties p = new Properties(); p.setProperty("bootstrap.servers", "localhost:9092"); p.setProperty("group.id", "test"); Kafka kafka = new Kafka().properties(p).topic("user").version("0.10"); Schema schema = new Schema() .field("userId", Types.STRING) .field("name", Types.STRING) .field("age", Types.STRING) .field("sex", Types.STRING) //.field("createTime", Types.BIG_DEC) .field("updateTime", Types.BIG_DEC) // 设置 rowtime,把字段timestamp设置为Event Time的水印时间戳 .field("rowTime", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("createTime").watermarksPeriodicBounded(60000)); tableEnv.connect(kafka) .withSchema(schema) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSource("Users"); Table table = tableEnv.sqlQuery("select * from Users"); tableEnv.toAppendStream(table, TypeInformation.of(Row.class)).print("row:"); Table table1 = tableEnv.sqlQuery("SELECT userId, TUMBLE_START(rowTime, INTERVAL '1' MINUTE) AS rStart, COUNT(1) AS countNum from Users GROUP BY TUMBLE(rowTime, INTERVAL '1' MINUTE),userId"); tableEnv.toAppendStream(table1, TypeInformation.of(Row.class)).print("tumble:"); Table table2 = tableEnv.sqlQuery("SELECT userId,TUMBLE_START(rowTime, INTERVAL '1' MINUTE) AS rStart, TUMBLE_END(rowTime, INTERVAL '1' MINUTE) AS rEnd, TUMBLE_ROWTIME(rowTime,INTERVAL '1' MINUTE) AS pTime,COUNT(1) AS countNum from Users GROUP BY TUMBLE(rowTime, INTERVAL '1' MINUTE),userId"); tableEnv.toAppendStream(table2, TypeInformation.of(Row.class)).print("tumble2:"); Table table3 = tableEnv.sqlQuery("SELECT userId,HOP_START(rowTime, INTERVAL '1' MINUTE,INTERVAL '5' MINUTE) as hStart,COUNT(1) AS countNum from Users GROUP BY HOP(rowTime,INTERVAL '1' MINUTE,INTERVAL '5' MINUTE),userId"); tableEnv.toAppendStream(table3, TypeInformation.of(Row.class)).print("hop:"); Table table4 = tableEnv.sqlQuery("SELECT userId,SESSION_START(rowTime,INTERVAL '1' MINUTE) AS sStart,COUNT(1) AS countNum from Users GROUP BY SESSION(rowTime,INTERVAL '1' MINUTE),userId"); tableEnv.toAppendStream(table4, TypeInformation.of(Row.class)).print("session:"); env.execute("SqlTumbleWindowProcessTimeStream"); } }
还没有评论,来说两句吧...