flink实例开发-batch批处理实例

缺乏、安全感 2022-05-30 10:43 363阅读 0赞

batch批处理实例

下面的示例程序将展示flink的不同应用程序从简单的单词计数到图计算。示例代码演示使用Flink的DataSet API。

以下的全部源代码和更多的例子可以在flink源码仓库的flink-examples-batch或者flink-examples-streaming模块中看到。

  • 运行一个示例
  • Word Count 单词计数
  • Page Rank 网页排名
  • Connected Componexts 连通分支
  • Relational Query 关系查询

运行一个示例

为了运行flink程序,我们假设你已经有一个可以运行的flink集群。在快速开始和实例开发模块中详细描述了如何启动flink。

最简单的方法是运行 ./bin/start-local.sh 脚本。这将启动一个本地的JobManager。

flink的每一个二进制发布包都包含一个examples目录,里面包含了这个页面的所有例子对应的jar包。

运行wordCount程序,执行以下命令:

  1. ./bin/flink run ./examples/batch/WordCount.jar

其它例子可以以类型的方式执行。

注意很多例子运行的时候不需要传递任何参数,因为使用的是内置的数据。使用wordCount运行真实数据,你必须通过路径指定具体的数据

  1. ./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result

注意,非本地文件系统的话需要模式前缀,例如:hdfs://

#

Word Count 单词计数

WordCount是大数据系统中的”hello world”入门程序。它计算单词在文本集合中的频率。

该算法有两步骤:首先,该文本被切割成单词,然后,这些单词被分组和求和。

java代码:

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. DataSet<String> text = env.readTextFile("/path/to/file");
  3. DataSet<Tuple2<String, Integer>> counts =
  4. // split up the lines in pairs (2-tuples) containing: (word,1)
  5. text.flatMap(new Tokenizer())
  6. // group by the tuple field "0" and sum up tuple field "1"
  7. .groupBy(0)
  8. .sum(1);
  9. counts.writeAsCsv(outputPath, "\n", " ");
  10. // User-defined functions
  11. public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
  12. @Override
  13. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  14. // normalize and split the line
  15. String[] tokens = value.toLowerCase().split("\\W+");
  16. // emit the pairs
  17. for (String token : tokens) {
  18. if (token.length() > 0) {
  19. out.collect(new Tuple2<String, Integer>(token, 1));
  20. }
  21. }
  22. }
  23. }

scala代码:

  1. val env = ExecutionEnvironment.getExecutionEnvironment
  2. // get input data
  3. val text = env.readTextFile("/path/to/file")
  4. val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  5. .map { (_, 1) }
  6. .groupBy(0)
  7. .sum(1)
  8. counts.writeAsCsv(outputPath, "\n", " ")

此链接中的wordCount example代码实现了输入参数:—input —output , 作为测试数据,任何文件文件都可以。

Page Rank 网页排名

PageRank算法通过图中定义的链接计算网页重要性,从一个页面到另一个页面。这是一个迭代的图算法,这意味着它需要执行相同的计算。在每个迭代中,每一页分配当前排名给它所有的邻居,并从邻居节点计算其新等级。PageRank算法是由google进行普及的,主要用在计算网页的重要性排名,应用在搜索结果里面。

在这个简单示例中,PageRank实现批量得带和固定数量的迭代。

java代码:

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. // read the pages and initial ranks by parsing a CSV file
  3. DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
  4. .types(Long.class, Double.class)
  5. // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
  6. DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
  7. // set iterative data set
  8. IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
  9. DataSet<Tuple2<Long, Double>> newRanks = iteration
  10. // join pages with outgoing edges and distribute rank
  11. .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
  12. // collect and sum ranks
  13. .groupBy(0).sum(1)
  14. // apply dampening factor
  15. .map(new Dampener(DAMPENING_FACTOR, numPages));
  16. DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
  17. newRanks,
  18. newRanks.join(iteration).where(0).equalTo(0)
  19. // termination condition
  20. .filter(new EpsilonFilter()));
  21. finalPageRanks.writeAsCsv(outputPath, "\n", " ");
  22. // User-defined functions
  23. public static final class JoinVertexWithEdgesMatch
  24. implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
  25. Tuple2<Long, Double>> {
  26. @Override
  27. public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
  28. Collector<Tuple2<Long, Double>> out) {
  29. Long[] neighbors = adj.f1;
  30. double rank = page.f1;
  31. double rankToDistribute = rank / ((double) neigbors.length);
  32. for (int i = 0; i < neighbors.length; i++) {
  33. out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
  34. }
  35. }
  36. }
  37. public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
  38. private final double dampening, randomJump;
  39. public Dampener(double dampening, double numVertices) {
  40. this.dampening = dampening;
  41. this.randomJump = (1 - dampening) / numVertices;
  42. }
  43. @Override
  44. public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
  45. value.f1 = (value.f1 * dampening) + randomJump;
  46. return value;
  47. }
  48. }
  49. public static final class EpsilonFilter
  50. implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
  51. @Override
  52. public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
  53. return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
  54. }
  55. }

scala代码:

  1. // User-defined types
  2. case class Link(sourceId: Long, targetId: Long)
  3. case class Page(pageId: Long, rank: Double)
  4. case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
  5. // set up execution environment
  6. val env = ExecutionEnvironment.getExecutionEnvironment
  7. // read the pages and initial ranks by parsing a CSV file
  8. val pages = env.readCsvFile[Page](pagesInputPath)
  9. // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
  10. val links = env.readCsvFile[Link](linksInputPath)
  11. // assign initial ranks to pages
  12. val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
  13. // build adjacency list from link input
  14. val adjacencyLists = links
  15. // initialize lists
  16. .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
  17. // concatenate lists
  18. .groupBy("sourceId").reduce {
  19. (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
  20. }
  21. // start iteration
  22. val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
  23. currentRanks =>
  24. val newRanks = currentRanks
  25. // distribute ranks to target pages
  26. .join(adjacencyLists).where("pageId").equalTo("sourceId") {
  27. (page, adjacent, out: Collector[Page]) =>
  28. for (targetId <- adjacent.targetIds) {
  29. out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
  30. }
  31. }
  32. // collect ranks and sum them up
  33. .groupBy("pageId").aggregate(SUM, "rank")
  34. // apply dampening factor
  35. .map { p =>
  36. Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
  37. }
  38. // terminate if no rank update was significant
  39. val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
  40. (current, next, out: Collector[Int]) =>
  41. // check for significant update
  42. if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
  43. }
  44. (newRanks, termination)
  45. }
  46. val result = finalRanks
  47. // emit result
  48. result.writeAsCsv(outputPath, "\n", " ")

这个PageRank程序实现了上面的例子。它需要以下参数运行:

  1. --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>

输入文件必须是纯文本文件,必须格式化为如下格式:

  • pages 使用一个long型的id进行表示,通过换行符进行分割。
  1. 例如: “1\n2\n12\n42\n63\n” 表示给了5个pages 1, 2, 12, 42, 和63.

    • links 使用一对page id进行表示,这一对page id通过空格进行分割。Links通过换行符进行分割
  2. 例如:”1 2\n2 12\n1 12\n42 63\n” 表示给了4对 links (1)->(2), (2)->(12), (1)->(12), 和(42)->(63)

对于这个简单的实现,要求每一页至少有一个输入和输出连接。一个页面可以指向它自己。

Connected Componexts 连通分支

算法识别部分的连接组件的一个更大的图连接通过分配所有顶点在相同的连接部分组件ID。类似于PageRank,连接组件是一个迭代算法。在每一步,每个顶点传播当前所有相邻组件ID。一个顶点接受邻居的组件ID,如果它小于自己的组件ID

这个实现使用一个增量迭代:顶点并没有改变他们的组件ID不参与下一步。这收益更好的性能,因为后来的迭代通常只处理一些异常值顶点

java代码

  1. // read vertex and edge data
  2. DataSet<Long> vertices = getVertexDataSet(env);
  3. DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
  4. // assign the initial component IDs (equal to the vertex ID)
  5. DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
  6. // open a delta iteration
  7. DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
  8. verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
  9. // apply the step logic:
  10. DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
  11. // join with the edges
  12. .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
  13. // select the minimum neighbor component ID
  14. .groupBy(0).aggregate(Aggregations.MIN, 1)
  15. // update if the component ID of the candidate is smaller
  16. .join(iteration.getSolutionSet()).where(0).equalTo(0)
  17. .flatMap(new ComponentIdFilter());
  18. // close the delta iteration (delta and new workset are identical)
  19. DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
  20. // emit result
  21. result.writeAsCsv(outputPath, "\n", " ");
  22. // User-defined functions
  23. public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
  24. @Override
  25. public Tuple2<T, T> map(T vertex) {
  26. return new Tuple2<T, T>(vertex, vertex);
  27. }
  28. }
  29. public static final class UndirectEdge
  30. implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  31. Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
  32. @Override
  33. public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
  34. invertedEdge.f0 = edge.f1;
  35. invertedEdge.f1 = edge.f0;
  36. out.collect(edge);
  37. out.collect(invertedEdge);
  38. }
  39. }
  40. public static final class NeighborWithComponentIDJoin
  41. implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
  42. @Override
  43. public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
  44. return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
  45. }
  46. }
  47. public static final class ComponentIdFilter
  48. implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
  49. Tuple2<Long, Long>> {
  50. @Override
  51. public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
  52. Collector<Tuple2<Long, Long>> out) {
  53. if (value.f0.f1 < value.f1.f1) {
  54. out.collect(value.f0);
  55. }
  56. }
  57. }

scala代码

  1. // set up execution environment
  2. val env = ExecutionEnvironment.getExecutionEnvironment
  3. // read vertex and edge data
  4. // assign the initial components (equal to the vertex id)
  5. val vertices = getVerticesDataSet(env).map { id => (id, id) }
  6. // undirected edges by emitting for each input edge the input edges itself and an inverted
  7. // version
  8. val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
  9. // open a delta iteration
  10. val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
  11. (s, ws) =>
  12. // apply the step logic: join with the edges
  13. val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
  14. (edge._2, vertex._2)
  15. }
  16. // select the minimum neighbor
  17. val minNeighbors = allNeighbors.groupBy(0).min(1)
  18. // update if the component of the candidate is smaller
  19. val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
  20. (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
  21. if (newVertex._2 < oldVertex._2) out.collect(newVertex)
  22. }
  23. // delta and new workset are identical
  24. (updatedComponents, updatedComponents)
  25. }
  26. verticesWithComponents.writeAsCsv(outputPath, "\n", " ")

ConnectedComponents程序实现了上面的例子。它需要以下参数运行:

  1. --vertices <path> --edges <path> --output <path> --iterations <n>

输入文件必须是纯文本文件,必须格式化为如下格式:

  • Vertices(顶点)使用id表示,使用换行符隔开
  1. 例如 “1\n2\n12\n42\n63\n” 表示指定5个顶点 (1), (2), (12), (42), and (63).
  • 顶点的边缘表示为两个id,使用空格隔开,边缘由换行符隔开
  1. 例如: “1 2\n2 12\n1 12\n42 63\n” 表示4个无向链接 (1)-(2), (2)-(12), (1)-(12), and (42)-(63).

Relational Query 关系查询

关系查询的例子假定两个表,一个订单和指定的其他与lineitem tpc - h基准决策支持。tpc - h是一个标准的数据库行业的基准。见下文说明如何生成输入数据

这个例子实现了下面的SQL查询

  1. SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
  2. FROM orders, lineitem
  3. WHERE l_orderkey = o_orderkey
  4. AND o_orderstatus = "F"
  5. AND YEAR(o_orderdate) > 1993
  6. AND o_orderpriority LIKE "5%"
  7. GROUP BY l_orderkey, o_shippriority;

Flink程序,实现了上面的查询,看起来如下:

java代码:

  1. // get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
  2. DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
  3. // get lineitem data set: (orderkey, extendedprice)
  4. DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);
  5. // orders filtered by year: (orderkey, custkey)
  6. DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
  7. // filter orders
  8. orders.filter(
  9. new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
  10. @Override
  11. public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
  12. // status filter
  13. if(!t.f1.equals(STATUS_FILTER)) {
  14. return false;
  15. // year filter
  16. } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
  17. return false;
  18. // order priority filter
  19. } else if(!t.f3.startsWith(OPRIO_FILTER)) {
  20. return false;
  21. }
  22. return true;
  23. }
  24. })
  25. // project fields out that are no longer required
  26. .project(0,4).types(Integer.class, Integer.class);
  27. // join orders with lineitems: (orderkey, shippriority, extendedprice)
  28. DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
  29. ordersFilteredByYear.joinWithHuge(lineitems)
  30. .where(0).equalTo(0)
  31. .projectFirst(0,1).projectSecond(1)
  32. .types(Integer.class, Integer.class, Double.class);
  33. // extendedprice sums: (orderkey, shippriority, sum(extendedprice))
  34. DataSet<Tuple3<Integer, Integer, Double>> priceSums =
  35. // group by order and sum extendedprice
  36. lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
  37. // emit result
  38. priceSums.writeAsCsv(outputPath);

这个Relational Query实现了上面的查询。它需要以下参数运行:

  1. --orders <path> --lineitem <path> --output <path>

可以生成订单和lineitem文件使用tpc - h基准测试套件的数据生成器工具(DBGEN)。采取以下步骤生成任意大Flink提供程序的输入文件

  1. 下载并解压DBGEN
  2. 复制makefile.suite 为 makefile,并且执行以下修改

    DATABASE = DB2
    MACHINE = LINUX
    WORKLOAD = TPCH
    CC = gcc

  3. 使用make命令构架DBGEN

  4. 使用dbgen生成lineitem和命令关系,比例因子(s)在生成的数据集1的结果大约1 GB大小。

    ./dbgen -T o -s 1

获取更多大数据资料,视频以及技术交流请加群:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3h1NDcwNDM4MDAw_size_16_color_FFFFFF_t_70

发表评论

表情:
评论列表 (有 0 条评论,363人围观)

还没有评论,来说两句吧...

相关阅读

    相关 JDBC 批处理Batch

    批处理 批处理允许您将相关的SQL语句分组到一个批中,并通过对数据库的一次调用提交它们。 当您一次将多个SQL语句发送到数据库时,可以减少通信开销,从而提高性能。 >