Flink实操 : Watermark

深藏阁楼爱情的钟 2022-11-05 08:46 418阅读 0赞

.

  • 一 .前言
  • 二 .概念
    • 2.1. Flink 时间语义
    • 2.2. 时间的特性
    • 2.3. Timestamp 分配和 Watermark 生成
    • 2.4. Watermark 传播
    • 2.5. ProcessFunction
    • 2.6. Watermark 处理
    • 2.7.多流的Watermark
  • 三 .代码实例
    • 3.1. 直接在数据源上使用WaterMark [重点]
    • 3.2. 在非数据源的操作之后使用 WaterMark [重点]
    • 3.3. 处理空闲数据源
    • 3.4.单调递增时间戳分配器
    • 3.5. 数据之间存在最大固定延迟的时间戳分配器
    • 3.6. Flink1.12之前版本 [过时]

一 .前言

二 .概念

在不同的应用场景中时间语义是各不相同的,Flink 作为一个先进的分布式流处理引擎,它本身支持不同的时间语义。
其核心是 Processing TimeEvent Time(Row Time)
这两类时间主要的不同点如下表所示:

在这里插入图片描述

Processing Time 是来模拟我们真实世界的时间,其实就算是处理数据的节点本地时间也不一定就是完完全全的我们真实世界的时间,所以说它是用来模拟真实世界的时间。

Event Time数据世界的时间,就是我们要处理的数据流世界里面的时间。

  • 关于他们的获取方式:
    Process Time 是通过直接去调用本地机器的时间,
    Event Time 则是根据每一条处理记录所携带的时间戳来判定。

这两种时间在 Flink 内部的处理以及还是用户的实际使用方面,难易程度都是不同的。

相对而言的 Processing Time 处理起来更加的简单,而 Event Time 要更麻烦一些。
而在使用 Processing Time 的时候,我们得到的处理结果(或者说流处理应用的内部状态)是不确定的
而因为在 Flink 内部对 Event Time 做了各种保障,使用 Event Time 的情况下,无论重放数据多少次,都能得到一个相对确定可重现的结果。

  • 因此在判断应该使用 Processing Time 还是 Event Time 的时候,可以遵循一个原则:
  1. 当你的应用遇到某些问题要从上一个 checkpoint 或者 savepoint 进行重放,是不是希望结果完全相同。
  2. 如果希望结果完全相同,就只能用 Event Time;
  3. 如果接受结果不同,则可以用 Processing Time。

Processing Time 的一个常见的用途是,我们要根据现实时间来统计整个系统的吞吐,比如要计算现实时间一个小时处理了多少条数据,这种情况只能使用 Processing Time。

2.2. 时间的特性

时间的一个重要特性是:时间只能递增,不会来回穿越。

在使用时间的时候我们要充分利用这个特性。
假设我们有这么一些记录,然后我们来分别看一下 Processing Time 还有 Event Time 对于时间的处理。

  • 对于 Processing Time,因为我们是使用的是本地节点的时间(假设这个节点的时钟同步没有问题),我们每一次取到的 Processing Time 肯定都是递增的,递增就代表着有序,所以说我们相当于拿到的是一个有序的数据流。
  • 而在用 **Event Time 的时候因为时间是绑定在每一条的记录上的,由于网络延迟、程序内部逻辑、或者其他一些分布式系统的原因,数据的时间可能会存在一定程度的乱序。**在 Event Time 场景下,我们把每一个记录所包含的时间称作 Record Timestamp。
    如果 Record Timestamp 所得到的时间序列存在乱序,我们就需要去处理这种情况。

如果单条数据之间是乱序,我们就考虑对于整个序列进行更大程度的离散化

简单地讲,就是把数据按照一定的条数组成一些小批次,但这里的小批次并不是攒够多少条就要去处理,而是为了对他们进行时间上的划分。经过这种更高层次的离散化之后,我们会发现最右边方框里的时间就是一定会小于中间方框里的时间中间框里的时间也一定会小于最左边方框里的时间
在这里插入图片描述
这个时候我们在整个时间序列里插入一些类似于标志位的一些特殊的处理数据,这些特殊的处理数据叫做 watermark

一个 watermark 本质上就代表了这个 watermark 所包含的 timestamp 数值,
表示以后到来的数据已经再也没有小于或等于这个时间的了

2.3. Timestamp 分配和 Watermark 生成

Flink 支持两种 watermark 生成方式。

  • 第一种是在 SourceFunction 中产生,相当于把整个的 timestamp 分配和 watermark 生成的逻辑放在流处理应用的源头
    我们可以在 SourceFunction 里面通过这两个方法产生 watermark.
    通过 collectWithTimestamp 方法发送一条数据,其中第一个参数就是我们要发送的数据,第二个参数就是这个数据所对应的时间戳;也可以调用 emitWatermark 方法去产生一条 watermark,表示接下来不会再有时间戳小于等于这个数值记录。
  • 有时候我们不想在 SourceFunction 里生成 timestamp 或者 watermark,或者说使用的 SourceFunction 本身不支持,我们还可以在使用 DataStream API 的时候指定,调用的 DataStream.assignTimestampsAndWatermarks 这个方法,能够接收不同的 timestamp 和 watermark 的生成器。

总体上而言生成器可以分为两类:第一类是定期生成器; 第二类是根据一些在流处理数据流中遇到的一些特殊记录生成的。

在这里插入图片描述
两者的区别主要有三个方面,

  • 首先定期生成现实时间驱动的,这里的“定期生成”主要是指 watermark(因为 timestamp 是每一条数据都需要有的),即定期会调用生成逻辑去产生一个 watermark
  • 而根据特殊记录生成是数据驱动的,即是否生成 watermark 不是由现实时间来决定,而是当看到一些特殊的记录就表示接下来可能不会有符合条件的数据再发过来了,这个时候相当于每一次分配 Timestamp 之后都会调用用户实现的 watermark 生成方法,用户需要在生成方法中去实现 watermark 的生成逻辑。
  • 大家要注意的是就是我们在分配 timestamp 和生成 watermark 的过程,虽然在 SourceFunction 和 DataStream 中都可以指定,但是还是建议生成的工作越靠近 DataSource 越好。这样会方便让程序逻辑里面更多的 operator 去判断某些数据是否乱序。

Flink 内部提供了很好的机制去保证这些 timestamp 和 watermark 被正确地传递到下游的节点。

2.4. Watermark 传播

在这里插入图片描述
具体的传播策略基本上遵循这三点。

  • 首先,watermark 会以广播的形式在算子之间进行传播。比如说上游的算子,它连接了三个下游的任务,它会把自己当前的收到的 watermark 以广播的形式传到下游
  • 第二,如果在程序里面收到了一个 Long.MAX_VALUE 这个数值的 watermark,就表示对应的那一条流的一个部分不会再有数据发过来了,它相当于就是一个终止的一个标志。
  • 第三,对于单流而言,这个策略比较好理解,而对于有多个输入的算子,watermark 的计算就有讲究了,一个原则是:单输入取其大,多输入取小。

举个例子,假设这边蓝色的块代表一个算子的一个任务,然后它有三个输入,分别是 W1、W2、W3,这三个输入可以理解成任何的输入,这三个输入可能是属于同一个流,也可能是属于不同的流。然后在计算 watermark 的时候,对于单个输入而言是取他们的最大值,因为我们都知道 watermark 应该遵循一个单调递增的一个原则。对于多输入,它要统计整个算子任务的 watermark 时,就会取这三个计算出来的 watermark 的最小值。即一个多个输入的任务,它的 watermark 受制于最慢的那条输入流。这一点类似于木桶效应,整个木桶中装的水会就是受制于最矮的那块板。

在这里插入图片描述

watermark 在传播的时候有一个特点是,它的传播是幂等的。
多次收到相同的 watermark,甚至收到之前的 watermark 都不会对最后的数值产生影响,因为对于单个输入永远是取最大的,而对于整个任务永远是取一个最小的

同时我们可以注意到这种设计其实有一个局限,具体体现在它没有区分你这个输入是一条流多个 partition 还是来自于不同的逻辑上的流的 JOIN。

对于同一个流的不同 partition,我们对他做这种强制的时钟同步是没有问题的,因为一开始就是把一条流拆散成不同的部分,但每一个部分之间共享相同的时钟。

但是如果算子的任务是在做类似于 JOIN 操作,那么要求你两个输入的时钟强制同步其实没有什么道理的,因为完全有可能是把一条离现在时间很近的数据流和一个离当前时间很远的数据流进行 JOIN,这个时候对于快的那条流,因为它要等慢的那条流,所以说它可能就要在状态中去缓存非常多的数据,这对于整个集群来说是一个很大的性能开销

2.5. ProcessFunction

在正式介绍 watermark 的处理之前,先简单介绍 ProcessFunction,因为 watermark 在任务里的处理逻辑分为内部逻辑外部逻辑

  • 外部逻辑其实就是通过 ProcessFunction 来体现的,如果你需要使用 Flink 提供的时间相关的 API 的话就只能写在 ProcessFunction 里。

ProcessFunction 和时间相关的功能主要有三点:

  1. 第一点就是根据你当前系统使用的时间语义不同,你可以去获取当前你正在处理这条记录的 Record Timestamp,或者当前的 Processing Time
  2. 第二点就是它可以获取当前算子的时间,可以把它理解成当前的 watermark
  3. 第三点就是为了在 ProcessFunction 中去实现一些相对复杂的功能,允许注册一些 timer(定时器)。

比如说在 watermark 达到某一个时间点的时候就触发定时器,所有的这些回调逻辑也都是由用户来提供,涉及到如下三个方法,registerEventTimeTimer、registerProcessingTimeTimer 和 onTimer。在 onTimer 方法中就需要去实现自己的回调逻辑,当条件满足时回调逻辑就会被触发。

一个简单的应用是,我们在做一些时间相关的处理的时候,可能需要缓存一部分数据,但这些数据不能一直去缓存下去,所以需要有一些过期的机制,我们可以通过 timer 去设定这么一个时间,指定某一些数据可能在将来的某一个时间点过期,从而把它从状态里删除掉。所有的这些和时间相关的逻辑在 Flink 内部都是由自己的 Time Service(时间服务)完成的。

2.6. Watermark 处理

在这里插入图片描述

2.7.多流的Watermark

  1. 在实际的计算中,往往会出现一个作业中会处理多个source的数据, source的数据进行groupBy分组, 那么来自不同source的相同的keyshuffle到同一个处理节点. 并且携带各自的Watermark .
  2. Flink内部要保证Watermark的**单调递增** , 多个sourceWatermark汇聚到一起是不可能单调递增的.
  3. Flink内部实现每一个边上只能有一个递增的Watermark, 当出现多个流携带EventTime汇聚到一起(groupBy或者Union). **Flink会选择所有输入流中EventTime中最小的一个向下游流出**. 从而保证Watermark的单调递增和数据的完整性.

在这里插入图片描述

三 .代码实例

Watermark 就是一个 时间戳 ,Flink可以给数据流添加Watermark,可以理解为:收到一条消息后,额外给这个消
息添加了一个时间字段,这就是添加Watermark。

  • Watermark并不会影响原有Eventtime
  • 当数据流添加Watermark后,会按照Watermark时间来触发窗口计算
  • 一般会设置Watermark时间,比Eventtime小几秒钟
  • 当接收到的 Watermark时间 >= 窗口的endTime ,则触发计算

在这里插入图片描述

3.1. 直接在数据源上使用WaterMark [重点]

  1. package com.boyi.watermark
  2. import java.time.Duration
  3. import java.util.Properties
  4. import org.apache.flink.api.common.eventtime. WatermarkStrategy
  5. import org.apache.flink.api.common.serialization.SimpleStringSchema
  6. import org.apache.flink.api.scala.createTypeInformation
  7. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  8. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
  9. import org.apache.flink.streaming.api.windowing.time.Time
  10. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  11. import org.apache.kafka.clients.CommonClientConfigs
  12. // 直接在数据源上使用WaterMark
  13. //当使用 Apache Kafka 连接器作为数据源时,每个 Kafka 分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)。
  14. // 然而,当使用 Kafka 数据源时,多个分区常常并行使用,
  15. // 因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式(这是 Kafka 消费客户端所固有的)。
  16. //
  17. // 在这种情况下,你可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制。
  18. // 使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,
  19. // 并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。
  20. //
  21. //例如,
  22. // 如果每个 Kafka 分区中的事件时间戳严格递增,则使用时间戳单调递增按分区生成的 watermark 将生成完美的全局 watermark。
  23. //
  24. // 注意,我们在示例中未使用 TimestampAssigner,而是使用了 Kafka 记录自身的时间戳。
  25. //
  26. object KafkaDataSourceWaterMarkDemo {
  27. def main(args : Array[String]) : Unit = {
  28. // 1. 创建流处理运行环境
  29. val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  30. env.setParallelism(1)
  31. val kafkaCluster = "192.168.101.30:9092"
  32. val kafkaTopic = "test"
  33. // 2. 创建Kafka数据流
  34. val props = new Properties()
  35. props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,kafkaCluster)
  36. val kafkaSource = new FlinkKafkaConsumer(kafkaTopic,new SimpleStringSchema(),props)
  37. // 3. 设置watermark
  38. kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(20)))
  39. // 4. 添加数据源
  40. val watermarkDataStream = env.addSource(kafkaSource)
  41. // 5. 处理数据
  42. watermarkDataStream.flatMap(_.split(" "))
  43. .map(x => (x,1))
  44. .keyBy(_._1)
  45. .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  46. .reduce((x1,x2) =>(x1._1, x1._2+x1._2))
  47. .print()
  48. // 6. 执行
  49. env.execute("KafkaWaterMarkDemo")
  50. }
  51. }

3.2. 在非数据源的操作之后使用 WaterMark [重点]

  • 示例

编写代码, 计算5秒内,用户的订单总额

订单数据(订单ID——UUID、用户ID、时间戳、订单金额),要求添加水印来解决网络延迟问题。

  • 步骤
  1. 创建流处理运行环境
  2. 设置处理时间为EventTime
  3. 创建一个订单样例类Order,包含四个字段(订单ID、用户ID、订单金额、时间戳)
  4. 创建一个自定义数据源

    • 随机生成订单ID(UUID)
    • 随机生成用户ID(0-2)
    • 随机生成订单金额(0-100)
    • 时间戳为当前系统时间
    • 每隔1秒生成一个订单
  5. 添加水印

    • 允许延迟2秒
    • 在获取水印方法中,打印水印时间、事件时间和当前系统时间
  6. 按照用户进行分流
  7. 设置5秒的时间窗口
  8. 进行聚合计算
  9. 打印结果数据
  10. 启动执行流处理

    package com.boyi.watermark

    import java.time.Duration
    import java.util.concurrent.TimeUnit
    import java.util.{ Date, UUID}

    import org.apache.flink.api.common.eventtime.{ SerializableTimestampAssigner, WatermarkStrategy}
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.functions.source.{ RichSourceFunction, SourceFunction}
    import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time

    import scala.util.Random

    // 在非数据源的操作之后使用 WaterMark

    object WaterMarkDemo {
    // 创建一个订单样例类Order,包含四个字段(订单ID、用户ID、订单金额、时间戳)
    case class Order (orderId: String, userId: Int, money: Long, timestamp: Long)

    def main(args : Array[String]) : Unit = {

    1. // 1. 创建流处理运行环境
    2. val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    3. env.setParallelism(1)
    4. // 2. 创建一个自定义数据源
    5. val orderDataStream = env.addSource(new RichSourceFunction[Order] {
    6. var isRunning = true
    7. override def run(sourceContext: SourceFunction.SourceContext[Order]): Unit = {
    8. while (isRunning){
    9. // - 随机生成订单ID(UUID)
    10. // - 随机生成用户ID(0-2)
    11. // - 随机生成订单金额(0-100)
    12. // - 时间戳为当前系统时间
    13. // - 每隔1秒生成一个订单
    14. val order = Order(UUID.randomUUID().toString,Random.nextInt(3),Random.nextInt(101),new Date().getTime)
    15. sourceContext.collect(order)
    16. TimeUnit.SECONDS.sleep(2)
    17. }
    18. }
    19. override def cancel(): Unit = {
    20. isRunning = false
    21. }
    22. })
  1. // 3. 添加Watermark
  2. val watermarkDataStream = orderDataStream.assignTimestampsAndWatermarks(
  3. WatermarkStrategy.forBoundedOutOfOrderness[Order](Duration.ofSeconds(20))
  4. .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
  5. override def extractTimestamp(element: Order, recordTimestamp: Long): Long = {
  6. element.timestamp
  7. }
  8. })
  9. .withIdleness(Duration.ofMinutes(1))
  10. )
  11. // 6. 按照用户进行分流
  12. // 7. 设置5秒的时间窗口
  13. // 8. 进行聚合计算
  14. // 9. 打印结果数据
  15. // 10. 启动执行流处理
  16. watermarkDataStream.keyBy(_.userId)
  17. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  18. .reduce {
  19. (order1, order2) =>
  20. Order(order2.orderId, order2.userId, order1.money + order2.money, 0)
  21. }
  22. .print()
  23. env.execute("WarkMarkDemoJob")
  24. }
  25. }

3.3. 处理空闲数据源

  1. /** * * * 如果数据源中的某一个分区/分片在一段时间内未发送事件数据, * 则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。 * * 我们称这类数据源为空闲输入或空闲源。 * * 在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。 * 由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。 * * 为了解决这个问题, * 你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。 * * WatermarkStrategy 为此提供了一个工具接口 * WatermarkStrategy * .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)) * .withIdleness(Duration.ofMinutes(1)) * */
  2. object FreeDataSourceWaterMark {
  3. def main(args : Array[String]) : Unit = {
  4. // 1. 创建流处理运行环境
  5. val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  6. env.setParallelism(1)
  7. val kafkaCluster = "192.168.101.30:9092"
  8. val kafkaTopic = "test"
  9. // 2. 创建Kafka数据流
  10. val props = new Properties()
  11. props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,kafkaCluster)
  12. val kafkaSource = new FlinkKafkaConsumer(kafkaTopic,new SimpleStringSchema(),props)
  13. // 3. 设置watermark
  14. kafkaSource.assignTimestampsAndWatermarks(
  15. WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(20))
  16. .withIdleness(Duration.ofMinutes(1))
  17. )
  18. // 4. 添加数据源
  19. val watermarkDataStream = env.addSource(kafkaSource)
  20. // 5. 处理数据
  21. watermarkDataStream.flatMap(_.split(" "))
  22. .map(x => (x,1))
  23. .keyBy(_._1)
  24. .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  25. .reduce((x1,x2) =>(x1._1, x1._2+x1._2))
  26. .print()
  27. // 6. 执行
  28. env.execute("KafkaWaterMarkDemo")
  29. }
  30. }

3.4.单调递增时间戳分配器

周期性 watermark 生成方式的一个最简单特例就是你给定的数据源中数据的时间戳升序出现。在这种情况下,当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。

注意:在 Flink 应用程序中,如果是并行数据源,则只要求并行数据源中的每个单分区数据源任务时间戳递增。例如,设置每一个并行数据源实例都只读取一个 Kafka 分区,则时间戳只需在每个 Kafka 分区内递增即可。Flink 的 watermark 合并机制会在并行数据流进行分发(shuffle)、联合(union)、连接(connect)或合并(merge)时生成正确的 watermark

WatermarkStrategy.forMonotonousTimestamps()

3.5. 数据之间存在最大固定延迟的时间戳分配器

另一个周期性 watermark 生成的典型例子是,watermark 滞后于数据流中最大(事件时间)时间戳一个固定的时间量。

该示例可以覆盖的场景是你预先知道数据流中的数据可能遇到的最大延迟,例如,在测试场景下创建了一个自定义数据源,并且这个数据源的产生的数据的时间戳在一个固定范围之内。Flink 针对上述场景提供了 boundedOutfordernessWatermarks 生成器,该生成器将 maxOutOfOrderness 作为参数,该参数代表在计算给定窗口的结果时,允许元素被忽略计算之前延迟到达的最长时间。其中延迟时长就等于 t_w - t ,其中 t 代表元素的(事件时间)时间戳,t_w 代表前一个 watermark 对应的(事件时间)时间戳。如果 lateness > 0,则认为该元素迟到了,并且在计算相应窗口的结果时默认会被忽略。有关使用延迟元素的详细内容,请参阅有关允许延迟的文档。

WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(10))

3.6. Flink1.12之前版本 [过时]

参考代码

  1. import java.util.UUID
  2. import java.util.concurrent.TimeUnit
  3. import org.apache.commons.lang.time.FastDateFormat
  4. import org.apache.flink.api.scala._
  5. import org.apache.flink.streaming.api.TimeCharacteristic
  6. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
  7. import org.apache.flink.streaming.api.functions.source.{ RichSourceFunction, SourceFunction}
  8. import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment}
  9. import org.apache.flink.streaming.api.watermark.Watermark
  10. import org.apache.flink.streaming.api.windowing.time.Time
  11. import scala.util.Random
  12. object WaterMarkDemo {
  13. // 3. 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳)
  14. case class Order(orderId: String, userId: Int, money: Long, timestamp: Long)
  15. def main(args: Array[String]): Unit = {
  16. // 1. 创建流处理运行环境
  17. val env = StreamExecutionEnvironment.getExecutionEnvironment
  18. // 2. 设置处理时间为`EventTime`
  19. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  20. // 4. 创建一个自定义数据源
  21. val orderDataStream: DataStream[Order] = env.addSource(new RichSourceFunction[Order] {
  22. var isRunning = true
  23. override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
  24. while (isRunning) {
  25. // - 随机生成订单ID(UUID)
  26. // - 随机生成用户ID(0-2)
  27. // - 随机生成订单金额(0-100)
  28. // - 时间戳为当前系统时间
  29. // - 每隔1秒生成一个订单
  30. val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101), new java.util.Date().getTime)
  31. ctx.collect(order)
  32. TimeUnit.SECONDS.sleep(1)
  33. }
  34. }
  35. override def cancel(): Unit = isRunning = false
  36. })
  37. // 5. 添加水印
  38. val watermarkDataStream = orderDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Order] {
  39. var currentTimestamp = 0L
  40. val delayTime = 2000
  41. override def getCurrentWatermark: Watermark = {
  42. // - 允许延迟2秒
  43. // - 在获取水印方法中,打印水印时间、当前事件时间和当前系统时间
  44. val watermark = new Watermark(currentTimestamp - delayTime)
  45. val dateFormat = FastDateFormat.getInstance("HH:mm:ss")
  46. println(s"当前水印时间:${dateFormat.format(watermark.getTimestamp)}, 当前事件时间: ${dateFormat.format(currentTimestamp)}, 当前系统时间: ${dateFormat.format(System.currentTimeMillis())}")
  47. watermark
  48. }
  49. override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = {
  50. val timestamp = element.timestamp
  51. currentTimestamp = Math.max(currentTimestamp, timestamp)
  52. currentTimestamp
  53. }
  54. })
  55. // 6. 按照用户进行分流
  56. // 7. 设置5秒的时间窗口
  57. // 8. 进行聚合计算
  58. // 9. 打印结果数据
  59. // 10. 启动执行流处理
  60. watermarkDataStream.keyBy(_.userId)
  61. .timeWindow(Time.seconds(5))
  62. .reduce {
  63. (order1, order2) =>
  64. Order(order2.orderId, order2.userId, order1.money + order2.money, 0)
  65. }
  66. .print()
  67. env.execute("WarkMarkDemoJob")
  68. }
  69. }

参考:

官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/event\_timestamps\_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/event\_time.html
https://ververica.cn/developers/advanced-tutorial-2-time-depth-analysis/

发表评论

表情:
评论列表 (有 0 条评论,418人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flink DataStream WaterMark

    Flink DataStream WaterMark 为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常通过从元素中的