【零基础学flink】flink DataStream API 详解 蔚落 2022-04-24 15:18 170阅读 0赞 Flink中的DataStream主要用于实现数据流的转换操作(例如,过滤,更新状态,定义窗口,聚合)。最初可以从各种源(例如,消息队列,套接字流,文件)创建数据流(DataStream)。结果通过sink返回,sink操作主要有:将数据写入文件、标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机集群上执行。 有关Flink API [基本概念][Link 1]的介绍,请参阅[基本概念][Link 1]。 为了创建您自己的Flink DataStream程序,我们鼓励您从[Flink程序的解剖][Flink]开始, 逐步添加您自己的 [stream transformation][]。其余部分充当其他操作和高级功能的参考。 * 示例程序 * 数据源 * DataStream转换 * data sink * 迭代 * 执行参数 * 容错 * 控制延迟 * 调试 * 本地执行环境 * 收集数据源 * 迭代器数据接收器 ## 示例程序 ## 以下程序是流窗口字数统计应用程序的完整工作示例,它在5秒窗口中对来自Web套接字的单词进行计数。您可以复制并粘贴代码以在本地运行它。 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStream.print(); env.execute("Window WordCount"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } } 要运行示例程序,首先从终端使用netcat启动输入流: nc -lk 9999 只需键入一些字符,上述程序就可以产生一个输出:输出是对输入字符的统计统计程序的输入。如果要查看大于1的计数,请在5秒内反复键入相同的单词(如果不能快速输入,则将窗口大小从5秒增加☺)。 ## 数据源 ## 源是您的程序从中读取数据的来源。您可以使用以下方法将源附加到您的程序`StreamExecutionEnvironment.addSource(sourceFunction)`。Flink附带了许多预先实现的源函数,但您可以通过实现`SourceFunction` 接口得到自定义的非并行源,或者通过实现`ParallelSourceFunction`接口或继承`RichParallelSourceFunction`来实现自定义的并行源。 有几个预定义的流源可通过`StreamExecutionEnvironment`访问: 基于文件的: * `readTextFile(path)`\-逐行读取复合 `TextInputFormat`格式的文本文件,并将它们作为字符串返回。 * `readFile(fileInputFormat, path)` - 按指定的文件输入格式指定读取(一次)文件。 * `readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)` - 这是前两个内部调用的方法。它用给定的`fileInputFormat`格式读取`path`路径指向的文件内容。根据`watchType`的值,这个source可以定期监视(每`interval`ms)path指向的文件是否有新数据(`FileProcessingMode.PROCESS_CONTINUOUSLY`)的路径,或者处理当前在路径中的数据并退出(`FileProcessingMode.PROCESS_ONCE`)。使用`pathFilter`,用户可以进一步排除正在处理的文件。 *实现:* Flink将文件读取过程分为两个子任务,即*目录监控*和*数据读取*。这两个子任务是由两个单独的实体实现。目录监视由单个**非并行**(并行性= 1)任务实现。读取过程是并行运行的,它的并行度等于job的并行度。目录监视任务主要是目录的扫描(定期或仅一次,具体取决于`watchType`的值)。数据读取是并行的:找到要处理的文件,将它们进行切分,并将切分的每一块分配给stream readers。stream readers才是读取实际文件数据。分割后的每一个部分仅仅会交给一个reader读取,并且一个reader可以逐个读取多个拆分部分数据。 *重要笔记:* 1. 如果`watchType`的值设置为`FileProcessingMode.PROCESS_CONTINUOUSLY`,则一旦文件被修改了,文件的全部内容将再次被处理。这可能打破“exactly-once”的语义,因为在文件末尾追加数据将导致文件**所有**内容都被重新处理一次。 2. 如果`watchType`值设置为`FileProcessingMode.PROCESS_ONCE`,则source扫描path**一次**并退出,并且不会等待reader完成文件内容的读取。当然,reader会继续阅读,直到所有文件内容均读取完毕。注意:source的关闭将导致不再会有新的检查点(checkpoint)。这可能会导致节点故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。 基于socket的source: * `socketTextStream` - 从套接字读取。元素可以用分隔符分隔。 基于集合: * `fromCollection(Collection)` - 从Java Java.util.Collection创建数据流。集合中的所有元素必须属于同一类型。 * `fromCollection(Iterator, Class)` - 从迭代器创建数据流。class指定了迭代器返回的元素的数据类型。 * `fromElements(T ...)` - 从给定的对象序列创建数据流。所有对象必须属于同一类型。 * `fromParallelCollection(SplittableIterator, Class)` - 并行地从迭代器创建数据流。class指定了迭代器返回的元素的数据类型。 * `generateSequence(from, to)` - 并行生成给定interval的数字序列。 自定义source: * `addSource` - 例如,要从Apache Kafka读取,您可以使用 `addSource(new FlinkKafkaConsumer08<>(...))`。请参阅[连接器][Link 2]以获取更多详 ## DataStream转换 ## 有关可用流转换的概述,请参阅[DataStream Transformation][stream transformation]。 ## data sink ## data sink使用把DataStream并将传输到文件,套接字,外部系统或打印它们。Flink带有各种内置输出格式API: * `writeAsText()`/ `TextOutputFormat`\- 按字符串顺序写入元素。通过调用每个元素的\*toString()\*方法获得字符串。 * `writeAsCsv(...)`/ `CsvOutputFormat`\- 将元组写为逗号分隔值的csv文件。行和字段分隔符是可配置的。每个字段的值来自对象的\*toString()\*方法。 * `print()`/ `printToErr()` - 在标准输出/标准错误流上打印每个元素的*toString()值。可选地,可以为输出设置前缀(msg)。这有助于区分不同的print*调用。如果并行度大于1,则输出也将以生成输出的任务的标识符为前缀。 * `writeUsingOutputFormat()`/ `FileOutputFormat`\- 自定义文件输出的方法和基类。支持自定义对象到byte的转换。 * `writeToSocket` - 将元素写入套接字 ,使用`SerializationSchema`进行序列化 * `addSink` - 调用自定义接收器功能。Flink捆绑了其他系统(如Apache Kafka)的连接器,这些系统可以作为sink目的地。 注意:`DataStream`的`write*()`方法主要用于调试目的,他们没有参与Flink的检查点机制,这意味着这些函数通常具有至少一次的语义。数据什么时候flush到目标文件系统取决于OutputFormat的实现。这意味着并非所有发送到OutputFormat的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。 为了可靠准确地,使用`flink-connector-filesystem`,可以保证 exactly-once将流传送到文件系统。此外,通过`.addSink(...)`方法的自定义实现sink也可以引入Flink的exactly-once语义检查点。 ## 迭代 ## 迭代流程序实现step函数并将其嵌入到`IterativeStream`中。由于DataStream可能永远不会终止,因此没有最大迭代次数。相反,我们需要指定流的哪个部分输入到迭代,哪个部分使用`split`转换算子或`filter`算子过滤掉(不迭代)。在这里,我们展示了使用filter的示例。首先,我们定义一个`IterativeStream` IterativeStream<Integer> iteration = input.iterate(); 然后,我们使用一系列转换指定将在循环内执行的逻辑(这里是一个简单的`map`转换) DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */); 为了让迭代停止,即定义迭代终止,可以调用`IterativeStream`的`closeWith(feedbackStream)`方法。输入到`closeWith`函数的DataStream 将反馈给迭代头(进入下一次迭代)。常见的做法是使用过滤器filter来分离方迭代头反馈的流和向后传播的流的一部分。这些filter可以例如定义“终止”逻辑,其中允许元素向下游传播而不是反馈给迭代头。 iteration.closeWith(iterationBody.filter(/* one part of the stream */)); DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */); 例如,这里是从一系列整数中连续减去1直到它们达到零的程序: DataStream<Long> someIntegers = env.generateSequence(0, 1000); IterativeStream<Long> iteration = someIntegers.iterate(); DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return value - 1 ; //会重复迭代 } }); DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return (value > 0); // 大于0的值将会反馈给迭代头,进入下次迭代 } }); iteration.closeWith(stillGreaterThanZero); DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return (value <= 0); } }); ## 执行参数设置 ## `StreamExecutionEnvironment`包含`ExecutionConfig`允许为运行时参数设置。 有关大多数参数的说明,请参阅[参数配置][Link 3]。这些参数特别适用于DataStream API: ### 容错 ### [State&Checkpointing][State_Checkpointing]描述了如何启用和配置Flink的检查点机制。 ### 延迟控制 ### 默认情况下,元素不会逐个传输到网络上(这会导致不必要的网络流量),但数据会被缓冲。可以在Flink配置文件中设置缓冲区的大小(缓冲区中的数据就是实际在计算机之间传输的数据量)。虽然修改缓冲区大小有利于优化吞吐量,尤其是当传入流速度不够快时,可能会导致延迟问题。为了控制吞吐量和延迟,您可以通过`env.setBufferTimeout(timeoutMillis)`在执行环境中(或单个运算算子中)上设置缓冲区填充的最长等待时间。在此之后,即使缓冲区未满,也会自动发送缓冲区,这个的默认值为100毫秒。 用法: LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); 为了最大化吞吐量,设置超时时间为-1:`setBufferTimeout(-1)`也就是只有在缓冲区满了的时候数据才会在网络上传输。 要最小化延迟,请将超时设置为接近0的值(例如5或10 ms)。应避免缓冲区超时为0,因为它可能导致严重的性能下降。 ## 调试 ## 在分布式集群中运行流式程序之前,最好确保实现的算法按预期工作。因此,实施数据分析的程序通常:检查结果,调试和改进的增量过程。 Flink通过支持IDE内的本地调试,测试数据的注入和结果数据的收集,提供了显著简化数据分析程序开发过程的功能。本节提供了一些如何简化Flink程序开发的提示。 ### 本地执行环境 ### A `LocalStreamEnvironment`在创建它的同一JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,则可以在代码中设置断点并轻松调试程序。 创建LocalEnvironment并使用如下: final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); DataStream<String> lines = env.addSource(/* some source */); // build your program env.execute(); ### 集合数据源 ### Flink提供了特殊的数据源,这些数据源由Java集合支持,以方便测试。一旦程序经过测试,源和接收器可以很容易地被读取/写入外部系统的源和接收器替换。 集合数据源可以如下使用: final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // Create a DataStream from a list of elements DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5); // Create a DataStream from any Java collection List<Tuple2<String, Integer>> data = ... DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data); // Create a DataStream from an Iterator Iterator<Long> longIt = ... DataStream<Long> myLongs = env.fromCollection(longIt, Long.class); \*\*注意:\*\*目前,集合数据源要求数据类型和迭代器实现 `Serializable`接口。此外,集合数据源不能并行执行(并行度= 1)。 ### 迭代器数据接收器 ### Flink还提供了一个sink来收集DataStream结果,以便进行测试和调试。它可以使用如下: import org.apache.flink.streaming.experimental.DataStreamUtils DataStream<Tuple2<String, Integer>> myResult = ... Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult) [ 回到顶部][Link 4] **注意:** `flink-streaming-contrib`模块已从Flink 1.5.0中删除。它的类已被移入`flink-streaming-java`和`flink-streaming-scala`。 [Link 1]: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html [Flink]: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html#anatomy-of-a-flink-program [stream transformation]: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/index.html [Link 2]: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/index.html [Link 3]: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/execution_configuration.html [State_Checkpointing]: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html [Link 4]: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#top
还没有评论,来说两句吧...