Spark之 RDD转换成DataFrame的Scala实现

水深无声 2022-01-07 10:57 382阅读 0赞

依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-sql_2.11</artifactId>
  4. <version>2.1.3</version>
  5. </dependency>

RDD转化成DataFrame:通过StructType指定schema

package com.zy.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
* RDD转化成DataFrame:通过StructType指定schema
*/
object StructTypeSchema {
def main(args: Array[String]): Unit = {
//创建sparkSession对象
val sparkSession: SparkSession = SparkSession.builder().appName(“StructTypeSchema”).master(“local[2]“).getOrCreate()
//获取sparkContext
val sc: SparkContext = sparkSession.sparkContext
//设置日志级别
sc.setLogLevel(“WARN”)

  1. //读取文件
  2. val textFile: RDD\[String\] = sc.textFile("D:\\\\person.txt")
  3. //切分文件
  4. val lineArrayRDD: RDD\[Array\[String\]\] = textFile.map(\_.split(","))
  5. //关联对象
  6. val rowRDD: RDD\[Row\] = lineArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
  7. //创建rdd的schema信息
  8. val schema: StructType = (new StructType)
  9. .add("id", IntegerType, true, "id")
  10. .add("name", StringType, false, "姓名")
  11. .add("age", IntegerType, true, "年龄")
  12. //根据rdd和schema信息创建DataFrame
  13. val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
  14. //DSL操作
  15. personDF.show()
  16. //sql 操作
  17. //将df注册成表
  18. personDF.createTempView("person")
  19. sparkSession.sql("select \* from person where id =3").show()
  20. sparkSession.stop()

}
}

package com.zy.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

RDD转化成DataFrame:利用反射机制推断schema
/**
* RDD转化成DataFrame:利用反射机制推断schema
*/

//todo 定义一个样例类
case class Person(id: Int, name: String, age: Int)

object CaseClassSchema {
def main(args: Array[String]): Unit = {
//构建sparkSession 指定appName和master地址(本地测试local)
val sparkSession: SparkSession = SparkSession.builder().appName(“CaseClassSchema”).master(“local[2]“).getOrCreate()
//获取sparkContext
val sc: SparkContext = sparkSession.sparkContext

  1. //设置日志输出级别
  2. sc.setLogLevel("WARN")
  3. //加载数据
  4. val dataRDD: RDD\[String\] = sc.textFile("D:\\\\person.txt")
  5. //切分数据
  6. val lineArrayRDD: RDD\[Array\[String\]\] = dataRDD.map(\_.split(","))
  7. //将rdd和person样例类关联
  8. val personRDD: RDD\[Person\] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
  9. //将rdd转换成dataFrame 导入隐式转换
  10. import sparkSession.implicits.\_
  11. val personDF: DataFrame = personRDD.toDF
  12. //DSL语法
  13. personDF.show()
  14. personDF.printSchema()
  15. personDF.select("name").show()
  16. personDF.filter($"age" > 30).show()
  17. println("---------------------------------------------")
  18. //sql语法
  19. //首先要创建临时视图
  20. personDF.createTempView("person")
  21. sparkSession.sql("select \* from person where id>1").show()
  22. sparkSession.stop()

}
}

发表评论

表情:
评论列表 (有 0 条评论,382人围观)

还没有评论,来说两句吧...

相关阅读