案例实操:Azkaban调度spark作业

柔情只为你懂 2023-10-03 15:36 89阅读 0赞

新建AccessLogDriverCluster类

img

  1. package com.it19gong.clickproject;
  2. import java.sql.PreparedStatement;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import org.apache.spark.SparkConf;
  6. import org.apache.spark.api.java.JavaRDD;
  7. import org.apache.spark.api.java.JavaSparkContext;
  8. import org.apache.spark.api.java.function.Function;
  9. import org.apache.spark.api.java.function.VoidFunction;
  10. import org.apache.spark.sql.DataFrame;
  11. import org.apache.spark.sql.Row;
  12. import org.apache.spark.sql.RowFactory;
  13. import org.apache.spark.sql.SQLContext;
  14. import org.apache.spark.sql.types.DataTypes;
  15. import org.apache.spark.sql.types.StructField;
  16. import org.apache.spark.sql.types.StructType;
  17. public class AccessLogDriverCluster {
  18. static DBHelper db1=null;
  19. public static void main(String[] args) throws Exception {
  20. // 创建SparkConf、JavaSparkContext、SQLContext
  21. SparkConf conf = new SparkConf()
  22. .setAppName("RDD2DataFrameProgrammatically");
  23. JavaSparkContext sc = new JavaSparkContext(conf);
  24. SQLContext sqlContext = new SQLContext(sc);
  25. // 第一步,创建一个普通的RDD,但是,必须将其转换为RDD<Row>的这种格式
  26. //获取昨天时间
  27. JavaRDD<String> lines = sc.textFile("hdfs://node1/data/clickLog/2019/08/31");
  28. // 分析一下
  29. // 它报了一个,不能直接从String转换为Integer的一个类型转换的错误
  30. // 就说明什么,说明有个数据,给定义成了String类型,结果使用的时候,要用Integer类型来使用
  31. // 而且,错误报在sql相关的代码中
  32. // 所以,基本可以断定,就是说,在sql中,用到age<=18的语法,所以就强行就将age转换为Integer来使用
  33. // 但是,肯定是之前有些步骤,将age定义为了String
  34. // 所以就往前找,就找到了这里
  35. // 往Row中塞数据的时候,要注意,什么格式的数据,就用什么格式转换一下,再塞进去
  36. JavaRDD<Row> clickRDD = lines.map(new Function<String, Row>() {
  37. private static final long serialVersionUID = 1L;
  38. @Override
  39. public Row call(String line) throws Exception {
  40. String itr[] = line.split(" ");
  41. String ip = itr[0];
  42. String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
  43. String url = itr[6];
  44. String upFlow = itr[9];
  45. return RowFactory.create(
  46. ip,
  47. date,
  48. url,
  49. Integer.valueOf(upFlow)
  50. );
  51. }
  52. });
  53. // 第二步,动态构造元数据
  54. // 比如说,id、name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql db里
  55. // 或者是配置文件中,加载出来的,是不固定的
  56. // 所以特别适合用这种编程的方式,来构造元数据
  57. List<StructField> structFields = new ArrayList<StructField>();
  58. structFields.add(DataTypes.createStructField("ip", DataTypes.StringType, true));
  59. structFields.add(DataTypes.createStructField("date", DataTypes.StringType, true));
  60. structFields.add(DataTypes.createStructField("url", DataTypes.StringType, true));
  61. structFields.add(DataTypes.createStructField("upflow", DataTypes.IntegerType, true));
  62. StructType structType = DataTypes.createStructType(structFields);
  63. // 第三步,使用动态构造的元数据,将RDD转换为DataFrame
  64. DataFrame studentDF = sqlContext.createDataFrame(clickRDD, structType);
  65. // 后面,就可以使用DataFrame了
  66. studentDF.registerTempTable("log");
  67. DataFrame sumFlowDF = sqlContext.sql("select ip,sum(upflow) as sum from log group by ip order by sum desc");
  68. db1=new DBHelper();
  69. final String sql="insert into upflow(ip,sum) values(?,?) ";
  70. sumFlowDF.javaRDD().foreach(new VoidFunction<Row>() {
  71. @Override
  72. public void call(Row t) throws Exception {
  73. // TODO Auto-generated method stub
  74. PreparedStatement pt = db1.conn.prepareStatement(sql);
  75. pt.setString(1,t.getString(0));
  76. pt.setString(2,String.valueOf(t.getLong(1)));
  77. pt.executeUpdate();
  78. }
  79. });;
  80. }
  81. }

打包

img

img

报错

img

删除apptest文件

img

再次打包

img

把打好的包拷贝出来

img

并且重命名

img

img

  1. vim project.sh
  2. /opt/modules/spark-1.5.1-bin-hadoop2.6/bin/spark-submit --class com.it19gong.clickproject.AccessLogDriverCluster --num-executors 3 --driver-memory 100m --executor-memory 100m --executor-cores 3 --files /opt/modules/hive/conf/hive-site.xml --driver-class-path /opt/modules/hive/lib/mysql-connector-java-5.1.28.jar /home/hadoop/sparkproject.jar

把原来的包删除

img

上传新的包

img

执行脚本

img

img

mysql数据多了两条

img

打开azkaban的页面,这里再次提醒要用谷歌浏览器

img

新建spark.job文件

  1. #command.job
  2. type=command
  3. command=bash project.sh

打包成zip包

img

img

img

img

img

上传zip包

img

img

开始执行

img

img

img

img

mysql数据库多了两天数据

img

参考链接:https://www.cnblogs.com/braveym/p/12259956.html

发表评论

表情:
评论列表 (有 0 条评论,89人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spark 案例

    案例实操   Spark Shell 仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在 IDE 中编制程序,然后打成 jar 包,然后提交到集群,最常用...

    相关 azkaban --- 案例

    目录 案例一 : 输出Hello World 案例二 :作业依赖 案例三 :内嵌工作流 案例四 :自动失败 案例五 :手动失败 案例六 :JavaProcess

    相关 Spark案例

    1. 数据结构:时间戳,省份,城市,用户,广告,中间字段使用空格分割。 `数据` 链接:https://pan.baidu.com/s/1N3aq3Ps