pyspark案例系列2-rdd与DataFrame相互转换

Love The Way You Lie 2022-10-14 12:53 43阅读 0赞

一.问题描述

今天写pyspark脚本的时候遇到一个问题,需要类似于关系型数据库group by再聚合的操作,尝试通过rdd来写,发现不好实现。
于是想到了使用DataFrame,通过类sql的group by直接进行实现。

二.解决方案

将rdd直接转为DataFrame。

首先进行配置:
SparkSession是Spark SQL的入口

  1. from pyspark import SparkContext, SparkConf
  2. from pyspark.sql.session import SparkSession
  3. spark_conf = SparkConf().setMaster("local[*]").setAppName("FindCommonFriends")
  4. sc = SparkContext(conf = spark_conf)
  5. spark = SparkSession(sc)

代码:

  1. -- 通过rdd生产DataFrame
  2. df = spark.createDataFrame(rdd)
  3. -- 通过rdd生产DataFrame,并给列进行命名
  4. df = spark.createDataFrame(rdd,['a', 'b'])

下面是一段生成group by的pyspark代码:

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. from pyspark import SparkContext, SparkConf
  4. from pyspark.sql.session import SparkSession
  5. from pyspark.sql.functions import collect_list
  6. from pyspark.sql.functions import udf, col
  7. # 设置Spark程序运行的地方,此处设置运行在本地模式,启动2个线程分析数据
  8. spark_conf = SparkConf().setMaster("local[*]").setAppName("FindCommonFriends")
  9. sc = SparkContext(conf = spark_conf)
  10. spark = SparkSession(sc)
  11. # 从本地文件生成rdd
  12. filename = 'file:///home/pyspark/friends.txt'
  13. rdd = sc.textFile(filename)
  14. # 根据分隔符进行分割并排序
  15. rdd1=rdd.map(lambda x:x.split("\t")).sortByKey(lambda x:x[0])
  16. # 根据rdd生成DataFrame
  17. df1 = spark.createDataFrame(rdd1,['a', 'b'])
  18. df2 = df1.groupBy("a").agg(collect_list('b').alias('b_new1')).orderBy("b_new1")
  19. df3=df2.groupBy("b_new1").agg(collect_list('a').alias('a_new1')).orderBy("b_new1")
  20. #df2.show()
  21. #df3.show()

参考:

1.https://www.cnblogs.com/Lee-yl/p/9759657.html

发表评论

表情:
评论列表 (有 0 条评论,43人围观)

还没有评论,来说两句吧...

相关阅读