Flink窗口api、Flink窗口分配器(Flink Window Api、Flink Window Assigners) 清疚 2024-04-22 06:57 21阅读 0赞 #### 文章目录 #### * * 窗口API * * 按键分区(Keyed Partition)和非按键分区(Non-Keyed Partition) * * 按键分区窗口(Keyed Windows) * 非按键分区(Non-Keyed Windows) * 窗口 API 的调用 * 窗口分配器(Window Assigners) * * 时间窗口(TimeWindows) * * 滚动处理时间窗口(TumblingProcessingTimeWindows) * 滑动处理时间窗口(SlidingProcessingTimeWindows) * 处理时间会话窗口(ProcessingTimeSessionWindows) * 滚动事件时间窗口(TumblingEventTimeWindows) * 滑动事件时间窗口(SlidingEventTimeWindows) * 事件时间会话窗口(EventTimeSessionWindows ) * 计数窗口 * * 滚动计数窗口 * 滑动计数窗口 * 全局窗口 ### 窗口API ### #### 按键分区(Keyed Partition)和非按键分区(Non-Keyed Partition) #### 在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流 KeyedStream来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,是否有 keyBy 操作。 ##### 按键分区窗口(Keyed Windows) ##### 经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调用.window()定义窗口。 stream.keyBy(...) .window(...) ##### 非按键分区(Non-Keyed Windows) ##### 如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。在代码中,直接基于 DataStream 调用.windowAll()定义窗口。 stream.windowAll(...) 其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做 keyBy,后面调用.window()时直接换成.windowAll()就可以了。 #### 窗口 API 的调用 #### 窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。 stream.keyBy(<key selector>) .window(<window assigner>) .aggregate(<window function>) ### 窗口分配器(Window Assigners) ### 定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。窗口分配数据的规则,其实就对应着不同的窗口类型。所以可以说,窗口分配器其实就是在指定窗口的类型。 窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。 #### 时间窗口(TimeWindows) #### 在较早的版本中,可以直接调用.timeWindow()来定义时间窗口;这种方式非常简洁,但使用事件时间语义时需要另外声明,程序员往往因为忘记这点而导致运行结果错误。所以在1.12 版本之后,这种方式已经被弃用了,标准的声明方式就是直接调用.window(),在里面传入对应时间语义下的窗口分配器。这样一来,我们不需要专门定义时间语义,默认就是事件时间;如果想用处理时间,那么在这里传入处理时间的窗口分配器就可以了。 ##### 滚动处理时间窗口(TumblingProcessingTimeWindows) ##### stream.keyBy(...) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate(...) 这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以解决`时区问题`: .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) ##### 滑动处理时间窗口(SlidingProcessingTimeWindows) ##### stream.keyBy(...) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(...) 这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口. ##### 处理时间会话窗口(ProcessingTimeSessionWindows) ##### 窗口分配器由类 ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。 stream.keyBy(...) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...) 这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。我们这里创建了静态会话超时时间为 10 秒的会话窗口。 .window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() { @Override public long extract(Tuple2<String, Long> element) { // 提取 session gap 值返回, 单位毫秒 return element.f0.length() * 1000; } })) ) 这里.withDynamicGap()方法需要传入一个 SessionWindowTimeGapExtractor 作为参数,用来定义 session gap 的动态提取逻辑。在这里,我们提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔。 ##### 滚动事件时间窗口(TumblingEventTimeWindows) ##### stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(...) 这里.of()方法也可以传入第二个参数 offset,用于设置窗口起始点的偏移量。 ##### 滑动事件时间窗口(SlidingEventTimeWindows) ##### stream.keyBy(...) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(...) ##### 事件时间会话窗口(EventTimeSessionWindows ) ##### 用法与处理事件会话窗口完全一致。 stream.keyBy(...) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...) #### 计数窗口 #### 计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink 为我们提供了非常方便的接口:直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类,下面我们就来看它们的具体实现。 ##### 滚动计数窗口 ##### 滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。 stream.keyBy(...).countWindow(10) 我们定义了一个长度为 10 的滚动计数窗口,当窗口中元素数量达到 10 的时候,就会触发计算执行并关闭窗口。 ##### 滑动计数窗口 ##### 与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size 和 slide,前者表示窗口大小,后者表示滑动步长。 stream.keyBy(...).countWindow(10,3) 我们定义了一个长度为 10、滑动步长为 3 的滑动计数窗口。每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果。 #### 全局窗口 #### 全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由 GlobalWindows 类提供。 stream.keyBy(...).window(GlobalWindows.create()); 需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。
还没有评论,来说两句吧...