SparkSQL(三):SparkSQL核心编程实践 ╰半橙微兮° 2022-12-23 12:29 304阅读 0赞 ### 文章目录 ### * 一、IDEA 开发 SparkSQL * 二、用户自定义函数 * * 2.1 UDF * 2.2 UDAF * 三、数据的加载和保存 * * 3.1 通用的加载和保存方式 * 3.2 Parquet * 3.3 JSON * 3.4 CSV * 3.5 MySQL * 3.6 Hive # 一、IDEA 开发 SparkSQL # object SparkSqlTest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("SparkSqlTest") //创建SparkSession对象 val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() /** * RDD=>DataFrame=>DataSet转换需要引入隐式转换规则,否则无法转换 * spark不是包名,是上下文环境对象名 * 该对象必须是val声明 */ import spark.implicits._ //读取json文件,创建DataFrame val df: DataFrame = spark.read.json("input/user.json") df.show() /** * SQL风格语法 */ df.createOrReplaceTempView("user") spark.sql("select age from user").show() /** * DSL风格语法 */ df.select("username", "age").show() /** * RDD=>DataFrame=>DataSet */ val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "jack", 30), (2, "rose", 28))) val dataFrame: DataFrame = rdd.toDF() dataFrame.show() println("---------------------") val dataSet: Dataset[Row] = dataFrame.as("User") dataSet.show() /** * DataSet=>DataFrame=>RDD */ val df2 = dataSet.toDF() val rdd2 = df2.rdd //RDD返回的RDD类型为Row,里面提供的getXXX方法可以获取字段值,类似jdbc处理结果集,但是索引从0开始 rdd2.foreach(t => println(t.getString(1))) /** * RDD=>DataSet */ val newDS = rdd.map { case (id, name, age) => User(id, name, age) }.toDS() /** * DataSet=>=>RDD */ newDS.rdd //释放资源 spark.stop() } } case class User(id: Int, name: String, age: Int) # 二、用户自定义函数 # 用户可以通过`spark.udf`功能添加自定义函数,实现自定义功能。 ## 2.1 UDF ## ① 创建`DataFrame` scala> var df = spark.read.json("/root/user.json") df: org.apache.spark.sql.DataFrame = [age: bigint, username: string] ② 注册`UDF` scala> spark.udf.register("addName",(x:String)=>"Name="+x) res0: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType))) ③ 创建临时表 scala> df.createOrReplaceTempView("user") ④ 应用`UDF` scala> spark.sql("select addName(username),age from user").show() +---------------------+---+ |UDF:addName(username)|age| +---------------------+---+ | Name=jack| 18| +---------------------+---+ ## 2.2 UDAF ## 强类型的`Dataset`和弱类型的`DataFrame`都提供了相关的聚合函数, 如`count()`、`countDistinct()`、`avg()`、`max()`、`min()`。除此之外,用户可以设定自己的自定义聚合函数,通过继承`UserDefinedAggregateFunction`来实现用户自定义聚合函数。 **①UDAF - 弱类型** object UDAFTest2 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("UDAFTest2") val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import spark.implicits._ val df = spark.read.json("input/user.json") df.createOrReplaceTempView("user") //创建聚合函数 val myAvgUDAF = new MyAvgUDAF //注册该聚合函数 spark.udf.register("avgAge", myAvgUDAF) spark.sql("select avgAge(age) from user").show() spark.stop() } } class MyAvgUDAF extends UserDefinedAggregateFunction { // 聚合函数输入参数的数据类型 override def inputSchema: StructType = StructType(Array(StructField("age", LongType))) // 聚合函数缓存区中值的数据类型(age,count) override def bufferSchema: StructType = { StructType(Array(StructField("age", LongType), StructField("count", LongType))) } // 函数返回值的数据类型 override def dataType: DataType = DoubleType // 稳定性:对于相同的输入是否一直返回相同的输出。 override def deterministic: Boolean = true // 函数缓冲区初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { // 存年龄的总和 buffer(0) = 0L // 存年龄的个数 buffer(1) = 0L } // 更新缓冲区中的数据 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // 合并缓冲区 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } override def evaluate(buffer: Row): Any = buffer.getLong(0).toDouble / buffer.getLong(1).toDouble } 打印: +--------------+ |myavgudaf(age)| +--------------+ | 19.0| +--------------+ **②UDAF - 弱类型** object UDAFTest3 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("UDAFTest3") val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import spark.implicits._ val df = spark.read.json("input/user.json") // 封装DataFrame为DataSet val ds = df.as[MyUser] // 创建强类型聚合函数 val udaf2 = new MyAvgUDAF2 // 将聚合函数转换为查询的列 val column: TypedColumn[MyUser, Double] = udaf2.toColumn ds.select(column).show() spark.stop() } } //输入数据类型 case class MyUser(username: String, age: Long) //缓存类型 case class AgeBuffer(var sum: Long, var count: Long) /** * 定义继承org.apache.spark.sql.expressions.Aggregator * 参数:输入数据类型/缓存类型/输出类型 */ class MyAvgUDAF2 extends Aggregator[MyUser, AgeBuffer, Double] { override def zero: AgeBuffer = AgeBuffer(0L, 0L) override def reduce(b: AgeBuffer, a: MyUser): AgeBuffer = { b.sum = b.sum + a.age b.count = b.count + 1 b } override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = { b1.sum = b1.sum + b2.sum b1.count = b1.count + b2.count b1 } override def finish(reduction: AgeBuffer): Double = { (reduction.sum / reduction.count).toDouble } override def bufferEncoder: Encoder[AgeBuffer] = Encoders.product override def outputEncoder: Encoder[Double] = Encoders.scalaDouble } # 三、数据的加载和保存 # ## 3.1 通用的加载和保存方式 ## `SparkSQL`提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的`API`,根据不同的参数读取和保存不同格式的数据,`SparkSQL`默认读取和保存的文件格式为`parquet`。 **① 加载数据** `spark.read.load`是加载数据的通用方法 scala> spark.read. csv format jdbc json load option options orc parquet schema table text textFile 如果读取不同格式的数据,可以对不同的数据格式进行设定 scala> spark.read.format("…")[.option("…")].load("…") * **format("…")**:指定加载的数据类型,包括`csv`、`jdbc`、`json`、`orc`、`parquet`和`textFile` * **load("…")**:在`csv`、`jdbc`、`json`、`orc`、`parquet`和`textFile`格式下需要传入加载数据的路径 * **option("…")**:在`jdbc`格式下需要传入`JDBC`相应参数,`url`、`user`、`password`和`dbtable` 前面都是使用`read API`先把文件加载到`DataFrame`然后再查询,其实,我们也可以直接在文件上进行查询: 文件格式.`文件路径` scala>spark.sql("select * from json.`/opt/module/data/user.json`").show **② 保存数据** `df.write.save`是保存数据的通用方法 scala>df.write. csv jdbc json orc parquet textFile… … 如果保存不同格式的数据,可以对不同的数据格式进行设定 scala>df.write.format("…")[.option("…")].save("…") * f**ormat("…")**:指定保存的数据类型,包括`csv`、`jdbc`、`json`、`orc`、`parquet`和`textFile`。 * **save ("…")**:在`csv`、`orc`、`parquet`和`textFile`格式下需要传入保存数据的路径。 * **option("…")**:在`jdbc`格式下需要传入`JDBC`相应参数,`url`、`user`、`password`和`dbtable` 保存操作可以使用`SaveMode`,用来指明如何处理数据,使用`mode()`方法来设置。 有一点很重要: 这些`SaveMode`都是没有加锁的,也不是原子操作。 `SaveMode`是一个枚举类,其中的常量包括: <table> <thead> <tr> <th>Scala/Java</th> <th>Any Language</th> <th>Meaning</th> </tr> </thead> <tbody> <tr> <td>SaveMode.ErrorIfExists(default)</td> <td>“error”(default)</td> <td>如果文件已经存在则抛出异常</td> </tr> <tr> <td>SaveMode.Append</td> <td>“append”</td> <td>如果文件已经存在则追加</td> </tr> <tr> <td>SaveMode.Overwrite</td> <td>“overwrite”</td> <td>如果文件已经存在则覆盖</td> </tr> <tr> <td>SaveMode.Ignore</td> <td>“ignore”</td> <td>如果文件已经存在则忽略</td> </tr> </tbody> </table> 例: df.write.mode("append").json("/opt/module/data/output") ## 3.2 Parquet ## `Spark SQL`的默认数据源为`Parquet`格式。`Parquet`是一种能够有效存储嵌套数据的列式存储格式。 数据源为`Parquet`文件时,`Spark SQL`可以方便的执行所有的操作,不需要使用`format`。修改配置项`spark.sql.sources.default`,可修改默认数据源格式。 **① 加载数据** scala> val df = spark.read.load("examples/src/main/resources/users.parquet") scala> df.show **② 保存数据** scala> var df = spark.read.json("/opt/module/data/input/people.json") //保存为parquet格式 scala> df.write.mode("append").save("/opt/module/data/output") ## 3.3 JSON ## `Spark SQL`能够自动推测`JSON`数据集的结构,并将它加载为一个`Dataset[Row]`。可以通过`SparkSession.read.json()`去加载`JSON`文件。 注意:`Spark`读取的`JSON`文件不是传统的`JSON`文件,每一行都应该是一个`JSON`串。格式如下: { "name":"Michael"} { "name":"Andy", "age":30} { "name":"Justin", "age":19} **加载JSON文件**: val path = "/opt/module/spark-local/people.json" val peopleDF = spark.read.json(path) ## 3.4 CSV ## `Spark SQL`可以配置`CSV`文件的列表信息,读取`CSV`文件,`CSV`文件的第一行设置为数据列。 spark.read.format("csv").option("sep", ";").option("inferSchema", "true") .option("header", "true").load("data/user.csv") ## 3.5 MySQL ## `Spark SQL`可以通过`JDBC`从关系型数据库中读取数据的方式创建`DataFrame`,通过对`DataFrame`一系列的计算后,还可以将数据再写回关系型数据库中。如果使用`spark-shell`操作,可在启动`shell`时指定相关的数据库驱动路径或者将相关的数据库驱动放到`spark`的类路径下。 bin/spark-shell --jars mysql-connector-java-5.1.27-bin.jar 我们这里只演示在`IDEA`中通过`JDBC`对`MySQL`进行操作 **① 导入依赖** <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency> **② 读取数据** val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL") //创建SparkSession对象 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ //方式1:通用的load方法读取 spark.read.format("jdbc") .option("url", "jdbc:mysql://hadoop1:3306/spark-sql") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "1234") .option("dbtable", "user") .load().show //方式2:通用的load方法读取 参数另一种形式 spark.read.format("jdbc") .options(Map("url"->"jdbc:mysql://hadoop1:3306/spark-sql?user=root&password=1234", "dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show //方式3:使用jdbc方法读取 val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "1234") val df: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop1:3306/spark-sql", "user", props) df.show //释放资源 spark.stop( **③ 写入数据** case class User2(name: String, age: Long) ...... val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL") //创建SparkSession对象 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20), User2("zs", 30))) val ds: Dataset[User2] = rdd.toDS //方式1:通用的方式 format指定写出类型 ds.write .format("jdbc") .option("url", "jdbc:mysql://hadoop1:3306/spark-sql") .option("user", "root") .option("password", "1234") .option("dbtable", "user") .mode(SaveMode.Append) .save() //方式2:通过jdbc方法 val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "1234") ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop1:3306/spark-sql", "user", props) //释放资源 spark.stop() ## 3.6 Hive ## `spark-shell`默认是`Hive`支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。 虽然`spark-shell`默认是支持`Hive`的,但是由于当前使用`Spark2.4.5`版本,和`Hive3.1.2`的版本不兼容,所以无法在对应版本的`spark-shell`中使用`hive`。未来版本会解决这个问题。 **代码操作`Hive`**: ① 导入依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> </dependency> ② 将`hive-site.xml`文件拷贝到项目的`resources`目录中,代码实现 //创建SparkSession val spark: SparkSession = SparkSession .builder() .enableHiveSupport() .master("local[*]") .appName("sql") .getOrCreate() 注意:在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址: `config("spark.sql.warehouse.dir", "hdfs://hadoop1:8020/user/hive/warehouse")` 若出现权限错误,可以代码最前面增加如下代码解决: `System.setProperty("HADOOP_USER_NAME", "root")`
还没有评论,来说两句吧...