pyspark案例系列2-rdd与DataFrame相互转换
一.问题描述
今天写pyspark脚本的时候遇到一个问题,需要类似于关系型数据库group by再聚合的操作,尝试通过rdd来写,发现不好实现。
于是想到了使用DataFrame,通过类sql的group by直接进行实现。
二.解决方案
将rdd直接转为DataFrame。
首先进行配置:
SparkSession是Spark SQL的入口
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
spark_conf = SparkConf().setMaster("local[*]").setAppName("FindCommonFriends")
sc = SparkContext(conf = spark_conf)
spark = SparkSession(sc)
代码:
-- 通过rdd生产DataFrame
df = spark.createDataFrame(rdd)
-- 通过rdd生产DataFrame,并给列进行命名
df = spark.createDataFrame(rdd,['a', 'b'])
下面是一段生成group by的pyspark代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import udf, col
# 设置Spark程序运行的地方,此处设置运行在本地模式,启动2个线程分析数据
spark_conf = SparkConf().setMaster("local[*]").setAppName("FindCommonFriends")
sc = SparkContext(conf = spark_conf)
spark = SparkSession(sc)
# 从本地文件生成rdd
filename = 'file:///home/pyspark/friends.txt'
rdd = sc.textFile(filename)
# 根据分隔符进行分割并排序
rdd1=rdd.map(lambda x:x.split("\t")).sortByKey(lambda x:x[0])
# 根据rdd生成DataFrame
df1 = spark.createDataFrame(rdd1,['a', 'b'])
df2 = df1.groupBy("a").agg(collect_list('b').alias('b_new1')).orderBy("b_new1")
df3=df2.groupBy("b_new1").agg(collect_list('a').alias('a_new1')).orderBy("b_new1")
#df2.show()
#df3.show()
参考:
1.https://www.cnblogs.com/Lee-yl/p/9759657.html
还没有评论,来说两句吧...