Flink--Table Api 和 sql 之时间属性(一)

偏执的太偏执、 2023-10-06 18:36 31阅读 0赞

Flink总共有三种时间语义:Processing time(处理时间)、Event time(事件时间)以及Ingestion time(摄入时间)。关于这些时间语义的具体解释,可以参考另一篇文章Flink的时间与watermarks详解。本文主要讲解Flink Table API & SQL中基于时间的算子如何定义时间语义。通过本文你可以了解到:

时间属性的简介
处理时间
事件时间

时间属性概述

Flink TableAPI&SQL中的基于时间的操作(如window),需要指定时间语义,表可以根据指定的时间戳提供一个逻辑时间属性。

时间属性是表schama的一部分,当使用DDL创建表时、DataStream转为表时或者使用TableSource时,会定义时间属性。一旦时间属性被定义完成,该时间属性可以看做是一个字段的引用,从而在基于时间的操作中使用该字段。

时间属性像一个时间戳,可以被访问并参与计算,如果一个时间属性参与计算,那么该时间属性会被雾化成一个常规的时间戳,常规的时间戳不能与Flink的时间与水位线兼容,不能被基于时间的操作所使用。

Flink TableAPI & SQL所需要的时间属性可以通过Datastream程序中指定,如下:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默认
  3. // 可以选择:
  4. // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  5. // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

.一、基于 Table Api 和 sql 混用

1.处理时间

基于本地的机器时间,是一种最简单的时间语义,但是不能保证结果一致性,使用该时间语义不需要提取时间戳和生成水位线。总共有三种方式定义处理时间属性。

1.1 DDL语句创建表时定义处理时间

处理时间的属性可以在DDL语句中被定义为一个计算列,需要使用PROCTIME()函数,如下所示:

  1. CREATE TABLE user_actions (
  2. user_name STRING,
  3. data STRING,
  4. user_action_time AS PROCTIME() -- 声明一个额外字段,作为处理时间属性
  5. ) WITH (
  6. ...
  7. );
  8. SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
  9. FROM user_actions
  10. GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分钟的滚动窗口
1.2DataStream转为Table的过程中定义处理时间

在将DataStream转为表时,在schema定义中可以通过.proctime属性指定时间属性,并将其放在其他schema字段的最后面,具体如下:

  1. DataStream<Tuple2<String, String>> stream = ...;
  2. // 声明一个额外逻辑字段作为处理时间属性
  3. Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");
  4. WindowedTable windowedTable = table.window(Tumble.over("10.minutes")
  5. .on("user_action_time")
  6. .as("userActionWindow"));
1.3 使用TableSource

自定义TableSource并实现DefinedProctimeAttribute 接口,如下:

  1. // 定义个带有处理时间属性的table source
  2. public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
  3. @Override
  4. public TypeInformation<Row> getReturnType() {
  5. String[] names = new String[] {
  6. "user_name" , "data"};
  7. TypeInformation[] types = new TypeInformation[] {
  8. Types.STRING(), Types.STRING()};
  9. return Types.ROW(names, types);
  10. }
  11. @Override
  12. public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
  13. // 创建stream
  14. DataStream<Row> stream = ...;
  15. return stream;
  16. }
  17. @Override
  18. public String getProctimeAttribute() {
  19. // 该字段会追加到schema中,作为第三个字段
  20. return "user_action_time";
  21. }
  22. }
  23. // 注册table source
  24. tEnv.registerTableSource("user_actions", new UserActionSource());
  25. WindowedTable windowedTable = tEnv
  26. .from("user_actions")
  27. .window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

2.事件时间

基于记录的具体时间戳,即便是存在乱序或者迟到数据也会保证结果的一致性。总共有三种方式定义处理时间属性.

2.1 DDL语句创建表时定事件时间

基于记录的具体时间戳,即便是存在乱序或者迟到数据也会保证结果的一致性。总共有三种方式定义处理时间属性,具体如下

读取kafka数据源,把系统时间设置为watermark.

  1. CREATE TABLE hsh5(
  2. symbol VARCHAR ,
  3. prod_code VARCHAR ,
  4. hq_type_code VARCHAR ,
  5. user_action_time TIMESTAMP(3),// 时间类型要指定3位
  6. -- 声明user_action_time作为事件时间属性,并允许5S的延迟
  7. WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
  8. )WITH (
  9. 'connector' = 'kafka',
  10. 'topic'='xxx',
  11. 'properties.bootstrap.servers' = 'xxx',
  12. 'format' = 'json',
  13. 'scan.startup.mode' = 'latest-offset'
  14. -- 'scan.startup.mode' = 'earliest-offset' // 消费最早的
  15. );
  16. SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
  17. FROM user_actions
  18. GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
2.2 DataStream转为Table的过程中定义事件时间

当定义Schema时通过.rowtime属性指定事件时间属性,必须在DataStream中指定时间戳与水位线。例如在数据集中,事件时间属性为event_time,此时Table中的事件时间字段中可以通过’event_time. rowtime‘来指定。

目前Flink支持两种方式定义EventTime字段,如下:

  1. // 方式1:
  2. // 提取timestamp并分配watermarks
  3. DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
  4. // 声明一个额外逻辑字段作为事件时间属性
  5. // 在table schema的末尾使用user_action_time.rowtime定义事件时间属性
  6. // 系统会在TableEnvironment中获取事件时间属性
  7. Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");
  8. // 方式2:
  9. // 从第一个字段提取timestamp并分配watermarks
  10. DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
  11. // 第一个字段已经用来提取时间戳,可以直接使用对应的字段作为事件时间属性
  12. Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");
  13. // 使用:
  14. WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time")
  15. .as("userActionWindow"));
2.3 使用TableSource

另外也可以在创建TableSource的时候,实现DefinedRowtimeAttributes接口来定义EventTime字段,在接口中需要实现getRowtimeAttributeDescriptors方法,创建基于EventTime的时间属性信息。

  1. // 定义带有rowtime属性的table source
  2. public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
  3. @Override
  4. public TypeInformation<Row> getReturnType() {
  5. String[] names = new String[] {
  6. "user_name", "data", "user_action_time"};
  7. TypeInformation[] types =
  8. new TypeInformation[] {
  9. Types.STRING(), Types.STRING(), Types.LONG()};
  10. return Types.ROW(names, types);
  11. }
  12. @Override
  13. public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
  14. // 创建流,基于user_action_time属性分配水位线
  15. DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
  16. return stream;
  17. }
  18. @Override
  19. public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
  20. // 标记user_action_time字段作为事件时间属性
  21. // 创建user_action_time描述符,用来标识时间属性字段
  22. RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
  23. "user_action_time",
  24. new ExistingField("user_action_time"),
  25. new AscendingTimestamps());
  26. List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
  27. return listRowtimeAttrDescr;
  28. }
  29. }
  30. // register表
  31. tEnv.registerTableSource("user_actions", new UserActionSource());
  32. WindowedTable windowedTable = tEnv
  33. .from("user_actions")
  34. .window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/
https://mp.weixin.qq.com/s/Qvi2AshGjkaES-Ce\_dg6UA

发表评论

表情:
评论列表 (有 0 条评论,31人围观)

还没有评论,来说两句吧...

相关阅读