java Flink(三十七)Flink底层ProcessFunction的常见ProcessFunction总结以及简单使用 曾经终败给现在 2022-08-31 04:39 156阅读 0赞 在flink中,转换算子是无法访问事件的时间戳信息和水位线信息的。 所以! flink提供了一些列的Low-Level转换算法,他们可以访问时间戳,watermark以及注册定时器。 **总结:** ProcessFunction可以被认为是一种提供了对状态和定时器访问的FlatMapFunction,没接收到一个数据流都会进行处理,可以通过访问时间戳来进行设置定时器等操作。 flink提供了8个ProcessFunction: ProcessFunction dataStream KeyedProcessFunction 用于KeyedStream,keyBy之后的流处理 CoProcessFunction 用于connect连接的流 ProcessJoinFunction 用于join流操作 BroadcastProcessFunction 用于广播 KeyedBroadcastProcessFunction keyBy之后的广播 ProcessWindowFunction 窗口增量聚合 ProcessAllWindowFunction 全窗口聚合 **1、KeyedBroadcastProcessFunction** 能看出来,KeyedBroadcastProcessFunction是在keyby之后进行处理的ProcessFunction。 使用模型: 参数有三个:K(key)、I(输入)、O(输出)![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70][] 查看源码: /** *处理输入流中的一个元素。 *value:传入的数据 *ctx:上下文,KeyedBroadcastProcessFunction内部实现了Context,可以访问定时器的时间戳以及当前元素*的时间戳。 *out:输出收集器 */ public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; /** *定时器 *timestamp:触发定时器的时间戳 */ public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} **2、CoProcessFunction** 之前介绍过CoGroup,是flink用来连接流的一种方式。 主要工作流程 ![å¨è¿éæå¥å¾çæè¿°][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2JvbGluZ19jYXZhbHJ5_size_16_color_FFFFFF_t_70_pic_left] 源码分析: ** ** CoProcessFunction与KeyedBroadcastProcessFunction底层都是实现AbstractRichFunction,不过里边是分成了两个流的分别处理。处理流1和流2 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70 1][] 使用demo: 1、首先接收两个流并keyby // 流1 要先按照id分组 DataStreamSource<String> sourceStream1 = env.addSource(consumer); KeyedStream<String, Tuple> stream1 = sourceStream1.keyBy(1); // 流2 要先按照id分组 DataStreamSource<String> sourceStream2 = env.addSource(consumer); KeyedStream<String, Tuple> stream2 = sourceStream1.keyBy(1); 2、流进行连接并使用 ![20210722184953101.png][] 3、处理是对两个流分别处理,可以通过注册状态将两个流进行交互。 **3、ProcessJoinFunction** ProcessJoinFunction 用来处理join连接之后的数据 当然跟CoProcessFunction还是有一定区别的,我们看源码 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70 2][] 它的processElement里边可以直接访问两个流的数据。 **4、 BroadcastProcessFunction** ** ** 之前介绍过Broadcast State: Broadcast State 是 Flink 1.5 引入的新特性。在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。 简单来说就是正常处理的数据流A与配置流B,B里边是一些动态变化的的规则,比如我们要将流A中大于5的输出出来,后边想改成8,我们只要再向B流传入8,将这个阈值当做状态保存,就可以动态进行规则变化。 它的源码中,上边是正常数据流,下边是配置规则流,所以我们一般都是 流A connect 流B ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70 3][] 具体代码参考之前广播状态文章。 **5、KeyedBroadcastProcessFunction** KeyedBroadcastProcessFunction 跟 BroadcastProcessFunction类似,不过多了一步keyby。 看一下源码: ![20210723095255643.png][] 传入多了一个key. 具体使用demo: dataStream.keyBy(0).connect(broadcastStream).process(new KeyedBroadcastProcessFunction<String, Tuple3<String, Integer, Long>, Map<String, Object>, Tuple2<String,Integer>>() ** 6、ProcessWindowFunction** ProcessWindowFunction也是增量聚合函数,类似于AggregateFunction,但是他内部可以访问上下文等。 ProcessWindowFunction也会等窗口关闭再对窗口内的数据进行计算 简单实例: import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.Random; import java.util.concurrent.TimeUnit; public class WindowFuncMain { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setParallelism(1); DataStream<Tuple2<String, Integer>> source = env.addSource(new RichSourceFunction<Tuple2<String, Integer>>() { private boolean isRunning = true; //班级 private String[] cla = {"A", "B", "C", "D"}; private Random random = new Random(); //满分 final Integer scores = 100; public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { while (isRunning) { TimeUnit.SECONDS.sleep(1); //随机选择一个班级 String cl = cla[random.nextInt(cla.length)]; Integer score = random.nextInt(scores); Tuple2 tuple2 = new Tuple2(cl, score); System.out.println("发送班级为:" + cl + "分数为:" + score); ctx.collect(tuple2); } } public void cancel() { isRunning = false; } }); //使用ProcessWindowFunction DataStream<String> resultStream = source.keyBy(value -> value.f0) .timeWindow(Time.seconds(10)) .process(new MyProcessWindowFunc()); resultStream.print(); env.execute(); } public static class MyProcessWindowFunc extends ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> { //这里的ValueState存了所有的状态,更新或者添加删除其实也是根据key去取的 private ValueState<Integer> valueState; private int size = 0; @Override public void open(Configuration parameters) throws Exception { valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("classScore", Integer.class, 0)); } @Override public void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception { for (Tuple2<String, Integer> tuple2 : elements) { valueState.update(valueState.value() + tuple2.f1); size = size + 1; } StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append(s).append(":").append(valueState.value()).append(" ").append(size); out.collect(stringBuffer.toString()); size = 0; } @Override public void clear(Context context) throws Exception { valueState.clear(); } } } **7、ProcessAllWindowFunction** 全窗口函数:ProcessAllWindowFunction 简单理解一下就是可以对窗口内所有元素进行操作,我们查看一下源码: ![20210723141539176.png][] 可以看到没有传入参数key,所以 ProcessAllWindowFunction 的用法跟ProcessWindowFunction的区别就是不需要进行keyby。 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70]: /images/20220829/fef765d37bcc420b8aceae9144203030.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2JvbGluZ19jYXZhbHJ5_size_16_color_FFFFFF_t_70_pic_left]: /images/20220829/5e95e432903f447f8676fae0aa5bdf01.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70 1]: /images/20220829/8b03ef48bc1d48cab03c56a870515ce8.png [20210722184953101.png]: /images/20220829/77c11cae5a2c463cb63b7b88e7945a78.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70 2]: /images/20220829/edd0868b986a4c7ba4f13bae4b1860fe.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70 3]: /images/20220829/33ac7520e5d34a3298cc3b7a70f5b687.png [20210723095255643.png]: /images/20220829/10747e615ed54427a070eebc3729a155.png [20210723141539176.png]: /images/20220829/fe60f60a537d486685862c7e7ffca0fc.png
还没有评论,来说两句吧...