PySpark—DataFrame笔记

电玩女神 2021-09-24 05:26 1627阅读 0赞

 本人博客园同篇文章:PySpark—DataFrame笔记
 DataFrame基础 + 示例,为了自查方便汇总了关于PySpark-dataframe相关知识点,集合了很多篇博客和知乎内容,结合了自身实践,加上了更多示例和讲解方便理解,本文内容较多配合目录看更方便。

 如有任何问题或者文章错误欢迎大家留言批评指正,感谢阅读。

什么是DataFrame?

  1. DataFrames通常是指本质上是表格形式的数据结构。它代表行,每个行都包含许多观察值。
  2. 行可以具有多种数据格式(异构),而列可以具有相同数据类型(异构)的数据。
  3. DataFrame通常除数据外还包含一些元数据。例如,列名和行名。
  4. 我们可以说DataFrames是二维数据结构,类似于SQL表或电子表格。
  5. DataFrames用于处理大量结构化和半结构化数据

连接本地spark

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession \
  3. .builder \
  4. .appName('my_app_name') \
  5. .getOrCreate()

Spark初始化设置

  1. from pyspark.sql import SparkSession
  2. # SparkSession 配置
  3. spark = SparkSession.builder \
  4. .appName("My test") \
  5. .getOrCreate()
  6. # spark.conf.set("spark.executor.memory", "1g")
  7. spark.conf.set("spark.sql.execution.arrow.enabled", "true")
  8. sc = spark.sparkContext
  9. sc.setLogLevel("WARN")

SparkSession 介绍

参考文章:
SparkSession思考与总结:https://blog.csdn.net/yyt8582/article/details/81840031
SparkSession的认识:https://www.cnblogs.com/zzhangyuhang/p/9039695.html
spark配置:https://spark.apache.org/docs/latest/configuration.html
## (1)为何出现SparkSession
 SparkSession 本质上是SparkConf、SparkContext、SQLContext、HiveContext和StreamingContext这些环境的集合,避免使用这些来分别执行配置、Spark环境、SQL环境、Hive环境和Streaming环境。SparkSession现在是读取数据、处理元数据、配置会话和管理集群资源的入口。

 (2)SparkSession创建RDD

  1. from pyspark.sql.session import SparkSession
  2. if __name__ == "__main__":
  3. spark = SparkSession.builder.master("local") \
  4. .appName("My test") \
  5. .config("spark.some.config.option", "some-value") \
  6. .getOrCreate()
  7. sc = spark.sparkContext
  8. data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
  9. rdd = sc.parallelize(data)

 (3)SparkSession实例化参数:

 通过静态类Builder来实例化。Builder 是 SparkSession 的构造器。 通过 Builder, 可以添加各种配置。可以通SparkSession.builder 来创建一个 SparkSession 的实例,并通过 stop 函数来停止 SparkSession。Builder又有很多方法,包括:
format_png
Builder 的方法如下:

  1. 1appName函数
  2. appName(String name)
  3. 用来设置应用程序名字,会显示在Spark web UI
  4. 2master函数
  5. master(String master)
  6. 设置Spark master URL 连接,比如"local" 设置本地运行,"local[4]"本地运行4cores,或则"spark://master:7077"运行在spark standalone 集群。
  7. 3config函数
  8. 这里有很多重载函数。其实从这里我们可以看出重载函数,是针对不同的情况,使用不同的函数,但是他们的功能都是用来设置配置项的。
  9. spark.some.config.optionsome-valueconfiguation性质的键值对完成。例如spark configuration propertiesyarn properties
  10. 1、.config(SparkConf conf)
  11. 根据给定的SparkConf设置配置选项列表。
  12. 2config(String key, boolean value)
  13. 设置配置项,针对值为boolean
  14. 3config(String key, double value)
  15. 设置配置项,针对值为double
  16. 4config(String key, long value)
  17. 设置配置项,针对值为long
  18. 5config(String key, String value)
  19. 设置配置项,针对值为String
  20. 4getOrCreate函数
  21. getOrCreate()
  22. 获取已经得到的 SparkSession,或则如果不存在则创建一个新的基于builder选项的SparkSession
  23. 5enableHiveSupport函数
  24. 表示支持Hive,包括 链接持久化Hive metastore, 支持Hive serdes, Hive用户自定义函数
  25. 6withExtensions函数
  26. withExtensions(scala.Function1<SparkSessionExtensions,scala.runtime.BoxedUnit> f)
  27. 这允许用户添加Analyzer rules, Optimizer rules, Planning Strategies 或者customized parser.这一函数我们是不常见的。

DF创建

 (1)直接创建

  1. # 直接创建Dataframe
  2. df = spark.createDataFrame([
  3. (1, 144.5, 5.9, 33, 'M'),
  4. (2, 167.2, 5.4, 45, 'M'),
  5. (3, 124.1, 5.2, 23, 'F'),
  6. (4, 144.5, 5.9, 33, 'M'),
  7. (5, 133.2, 5.7, 54, 'F'),
  8. (3, 124.1, 5.2, 23, 'F'),
  9. (5, 129.2, 5.3, 42, 'M'),
  10. ], ['id', 'weight', 'height', 'age', 'gender'])

 (2)从字典创建

  1. df = spark.createDataFrame([{ 'name':'Alice','age':1},
  2. { 'name':'Polo','age':1}])

 (3)指定schema创建

  1. schema = StructType([
  2. StructField("id", LongType(), True),
  3. StructField("name", StringType(), True),
  4. StructField("age", LongType(), True),
  5. StructField("eyeColor", StringType(), True)
  6. ])
  7. df = spark.createDataFrame(csvRDD, schema)

 (4)读文件创建

  1. airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')

 (5)从pandas dataframe创建

  1. import pandas as pd
  2. from pyspark.sql import SparkSession
  3. colors = ['white','green','yellow','red','brown','pink']
  4. color_df=pd.DataFrame(colors,columns=['color'])
  5. color_df['length']=color_df['color'].apply(len)
  6. color_df=spark.createDataFrame(color_df)
  7. color_df.show()

DF的架构查看

  1. df.printSchema()

describe统计属性

  1. # 例如特定列中的行总数,其均值,标准差,列的最小值和最大值/空值计数
  2. df.describe(["age"]).show()

dtypes查看字段类型

  1. # 查看列的类型 ,同pandas
  2. color_df.dtypes
  3. # [('color', 'string'), ('length', 'bigint')]

查看列名/行数

  1. # 查看有哪些列 ,同pandas
  2. df.columns
  3. # ['color', 'length']
  4. # 行数
  5. df.count()
  6. # 列数
  7. len(df.columns)

distinct查找列唯一值

  1. df.select('id').distinct()
  2. .rdd.map(lambda r: r[0]).collect()

show显示

  1. # show和head函数显示数据帧的前N行
  2. df.show(5)
  3. df.head(5)

统计分析

 (1)频繁项目

  1. # 查找每列出现次数占总的30%以上频繁项目
  2. df.stat.freqItems(["id", "gender"], 0.3).show()
  3. +------------+----------------+
  4. |id_freqItems|gender_freqItems|
  5. +------------+----------------+
  6. | [5, 3]| [M, F]|
  7. +------------+----------------+

 (2)交叉表

  1. # 分组统计,交叉分析
  2. # 计算给定列的成对频率表
  3. df.crosstab('Age', 'Gender').show()
  4. Output:
  5. +----------+-----+------+
  6. |Age_Gender| F| M|
  7. +----------+-----+------+
  8. | 0-17| 5083| 10019|
  9. | 46-50|13199| 32502|
  10. | 18-25|24628| 75032|
  11. | 36-45|27170| 82843|
  12. | 55+| 5083| 16421|
  13. | 51-55| 9894| 28607|
  14. | 26-35|50752|168835|
  15. +----------+-----+------

select选择和切片筛选

 (1)列的选择

  1. # 选择一列的几种方式,比较麻烦,不像pandas直接用df['cols']就可以了
  2. # 需要在filter,select等操作符中才能使用
  3. color_df.select('length').show()
  4. color_df.select(color_df.length).show()
  5. color_df.select(color_df[0]).show()
  6. color_df.select(color_df['length']).show()
  7. color_df.filter(color_df['length']>=4).show() # filter方法

 (2)选择几列的方法

  1. color_df.select('length','color').show()
  2. # 如果是pandas,似乎要简单些
  3. color_df[['length','color']]

 (3)多列选择和切片

  1. # 3.多列选择和切片
  2. color_df.select('length','color')
  3. .select(color_df['length']>4).show()

 (4)between 范围选择

  1. color_df.filter(color_df.length.between(4,5) )
  2. .select(color_df.color.alias('mid_length')).show()

 (5)联合筛选

  1. # 这里使用一种是 color_df.length, 另一种是color_df[0]
  2. color_df.filter(color_df.length>4)
  3. .filter(color_df[0]!='white').show()

 (6)filter运行类SQL

  1. color_df.filter("color='green'").show()
  2. color_df.filter("color like 'b%'").show()

 (7)where方法的SQL

  1. color_df.where("color like '%yellow%'").show()

 (8)直接使用SQL语法

  1. # 首先dataframe注册为临时表,然后执行SQL查询
  2. color_df.createOrReplaceTempView("color_df")
  3. spark.sql("select count(1) from color_df").show()

drop删除一列

  1. # 删除一列
  2. color_df.drop('length').show()
  3. # pandas写法
  4. df.drop(labels=['a'],axis=1)

withColumn新增/修改列

  1. withColumn(colName, col)
  2. 通过为原数据框添加一个新列或替换已存在的同名列而返回一个新数据框。colName —— 是一个字符串, 为新列的名字。必须是已存在的列的名字
  3. col —— 为这个新列的 Column 表达式。必须是含有列的表达式。如果不是它会报错 AssertionError: col should be Column

 (1)新增一列

  1. # 列名可以是原有列,也可以是新列
  2. df.withColumn('page_count', df.page_count+100)
  3. df.withColumn('new_page_count', df.page_count+100)

 (2)lit新增一列常量

  1. # lit新增一列常量
  2. import pyspark.sql.functions as F
  3. df = df.withColumn('mark', F.lit(1))

withColumnRenamed更改列名:

 (1)直接修改

  1. # 修改单个列名
  2. new_df = df.withColumnRenamed('old_name', 'new_name')

 (2)聚合后修改

  1. 一、withColumnRenamed()方式修改列名:
  2. # 重新命名聚合后结果的列名(需要修改多个列名就跟多个:withColumnRenamed)
  3. # 聚合之后不修改列名则会显示:count(member_name)
  4. df_res.agg({ 'member_name': 'count', 'income': 'sum', 'num': 'sum'})
  5. .withColumnRenamed("count(member_name)", "member_num").show()
  6. 二、利用pyspark.sql中的functions修改列名:
  7. from pyspark.sql import functions as F
  8. df_res.agg(
  9. F.count('member_name').alias('mem_num'),
  10. F.sum('num').alias('order_num'),
  11. F.sum("income").alias('total_income')
  12. ).show()

cast修改列数据类型

  1. from pyspark.sql.types import IntegerType
  2. # 下面两种修改方式等价
  3. df = df.withColumn("height", df["height"].cast(IntegerType()))
  4. df = df.withColumn("weight", df.weight.cast('int'))
  5. print(df.dtypes)

sort排序

 (1)单字段排序

  1. # spark排序
  2. color_df.sort('color',ascending=False).show()
  3. # pandas的排序
  4. df.sort_values(by='b')

 (2)多字段排序

  1. color_df.filter(color_df['length']>=4)
  2. .sort('length', 'color', ascending=False).show()

 (3)混合排序

  1. color_df.sort(color_df.length.desc(),color_df.color.asc())
  2. .show()

 (4)orderBy排序

  1. color_df.orderBy('length','color').show()

#toDF

  1. toDF(*cols)
  2. Parameters:
  3. cols list of new column names (string)
  4. # 返回具有新指定列名的DataFrame
  5. df.toDF('f1', 'f2')

DF与RDD互换

  1. rdd_df = df.rdd # DF转RDD
  2. df = rdd_df.toDF() # RDD转DF

DF和Pandas互换

  1. pandas_df = spark_df.toPandas()
  2. spark_df = sqlContext.createDataFrame(pandas_df)

union合并+去重:

  1. nodes_cust = edges.select('tx_ccl_id', 'cust_id') # 客户编号
  2. nodes_cp = edges.select('tx_ccl_id', 'cp_cust_id') # 交易对手编号
  3. nodes_cp = nodes_cp.withColumnRenamed('cp_cust_id', 'cust_id') # 统一节点列名
  4. nodes = nodes_cust.union(nodes_cp).dropDuplicates(['cust_id'])

count行数/列数

  1. # 行数
  2. df.count()
  3. # 列数
  4. len(df.columns)

缺失值

 (1)计算列中的空值数目

  1. # 计算一列空值数目
  2. df.filter(df['col_name'].isNull()).count()
  3. # 计算每列空值数目
  4. for col in df.columns:
  5. print(col, "\t", "with null values: ",
  6. df.filter(df[col].isNull()).count())

 (2)删除有缺失值的行

  1. # 1、删除有缺失值的行
  2. df2 = df.dropna()
  3. # 2、或者
  4. df2 = df.na.drop()

 (3)平均值填充缺失值

  1. from pyspark.sql.functions import when
  2. import pyspark.sql.functions as F
  3. # 计算各个数值列的平均值
  4. def mean_of_pyspark_columns(df, numeric_cols):
  5. col_with_mean = []
  6. for col in numeric_cols:
  7. mean_value = df.select(F.avg(df[col]))
  8. avg_col = mean_value.columns[0]
  9. res = mean_value.rdd.map(lambda row: row[avg_col]).collect()
  10. col_with_mean.append([col, res[0]])
  11. return col_with_mean
  12. # 用平均值填充缺失值
  13. def fill_missing_with_mean(df, numeric_cols):
  14. col_with_mean = mean_of_pyspark_columns(df, numeric_cols)
  15. for col, mean in col_with_mean:
  16. df = df.withColumn(col, when(df[col].isNull() == True, F.lit(mean)).otherwise(df[col]))
  17. return df
  18. if __name__ == '__main__':
  19. # df需要自行创建
  20. numeric_cols = ['age2', 'height2'] # 需要填充空值的列
  21. df = fill_missing_with_mean(df, numeric_cols) # 空值填充
  22. df.show()

替换值

 (1)replace 全量替换

  1. # 替换pyspark dataframe中的任何值,而无需选择特定列
  2. df = df.replace'?'None
  3. df = df.replace'ckd \t''ckd'

 (2)functions 部分替换

  1. # 只替换特定列中的值,则不能使用replace.而使用pyspark.sql.functions
  2. # 用classck的notckd替换no
  3. import pyspark.sql.functions as F
  4. df = df.withColumn('class',
  5. F.when(df['class'] == 'no', F.lit('notckd'))
  6. .otherwise(df['class']))

groupBy + agg 聚合

 (1)agg

  1. agg(self, *exprs)计算聚合并将结果返回为:`DataFrame`
  2. 可用的聚合函数有“avg”、“max”、“min”、“sum”、“count”。
  3. param exprs:从列名(字符串)到聚合函数(字符串)的dict映射,
  4. 或:类:`Column`的列表。
  5. # 官方接口示例
  6. >>> gdf = df.groupBy(df.name)
  7. >>> sorted(gdf.agg({ "*": "count"}).collect())
  8. [Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]
  9. >>> from pyspark.sql import functions as F
  10. >>> sorted(gdf.agg(F.min(df.age)).collect())
  11. [Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]

 (2)sum

  1. # 获得两列总分数和总人数,groupBy可以根据多列分组
  2. df = df.groupBy('anchor_id')
  3. .agg({ "live_score": "sum", "live_comment_count": "sum"})
  4. .withColumnRenamed("sum(live_score)", "total_score")
  5. .withColumnRenamed("sum(live_comment_count)", "total_people")

 (3)avg

  1. # avg方法计算平均得分
  2. df = df.groupBy("course_id")
  3. .agg({ "score": "avg"})
  4. .withColumnRenamed("avg(score)", "avg_score")

 (4)count

  1. # count方法计算资源个数
  2. df = df.groupBy("course_id")
  3. .agg({ "comment": "count"})
  4. .withColumnRenamed("count(comment)", "comment_count")

## (5)max/min

  1. # max取最大值min取最小值
  2. df = df.groupBy("org_id")
  3. .agg({ "publish_date": "max"})
  4. .withColumnRenamed("max(publish_date)", "active_time")

 (6)collect_list()

  1. # collect_list()将groupBy的数据处理成列表
  2. from pyspark.sql import functions as F
  3. edges.show()
  4. df = edges.groupBy("tx_ccl_id").agg(F.collect_list("cust_id"))
  5. .withColumnRenamed("collect_list(cust_id)", "comment_list")
  6. df.show()
  7. # edge.show():结果
  8. +-------+----------+-----------+---------+
  9. |cust_id|cp_cust_id|drct_tx_amt|tx_ccl_id|
  10. +-------+----------+-----------+---------+
  11. | 18| 62| 7646.5839| 0|
  12. | 88| 41| 7683.6484| 0|
  13. | 90| 68| 16184.5801| 0|
  14. | 95| 5| 11888.3697| 0|
  15. ………………………………………………………………………………………………………………
  16. # df.show():结果
  17. +---------+---------------------+
  18. |tx_ccl_id|collect_list(cust_id)|
  19. +---------+---------------------+
  20. | 0| [18, 88, 90, 95, ...|
  21. | 1| [1077, 1011, 1004...|

join 连接

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join\#pyspark.sql.DataFrame.join

  1. join(other, on=None, how=None)
  2. # 使用给定的连接表达式与另一个DataFrame连接
  3. Parameters
  4. other 连接的右端
  5. on 联接列名的字符串、列名列表、联接表达式(列)或列列表。如果on是指示联接列名称的字符串或字符串列表,则该列必须存在于两边,这将执行equi联接。
  6. how str, 默认inner连接. 必须是以下的其中一个: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.

 (1)根据单个字段连接

  1. # 外连接
  2. >>> df.join(df2, df.name == df2.name, 'outer')
  3. .select(df.name, df2.height).collect()
  4. [Row(name=None, height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]
  5. # 外连接(与上个示例等价)
  6. >>> df.join(df2, 'name', 'outer').select('name', 'height').collect()
  7. [Row(name='Tom', height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]

 (2)根据多个字段连接

  1. # 根据多个字段连接
  2. >>> cond = [df.name == df3.name, df.age == df3.age]
  3. >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
  4. [Row(name='Alice', age=2), Row(name='Bob', age=5)]
  5. # 根据多个字段连接(与上个示例等价)
  6. >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
  7. [Row(name='Bob', age=5)]

差集/并集/交集

  1. # 创建数据
  2. df = spark.createDataFrame((
  3. (1, "asf"),
  4. (2, "2143"),
  5. (3, "rfds")
  6. )).toDF("label", "sentence")
  7. df2 = spark.createDataFrame((
  8. (1, "asf"),
  9. (2, "2143"),
  10. (4, "f8934y")
  11. )).toDF("label", "sentence")

 (1)差集

  1. newDF = df.select("sentence")
  2. .subtract(df2.select("sentence"))
  3. newDF.show()
  4. +--------+
  5. |sentence|
  6. +--------+
  7. | f8934y|
  8. +--------+

 (2)交集

  1. newDF = df.select("sentence")
  2. .intersect(df2.select("sentence"))
  3. newDF.show()
  4. +--------+
  5. |sentence|
  6. +--------+
  7. | asf|
  8. | 2143|
  9. +--------+

 (3)并集

  1. newDF = df.select("sentence")
  2. .union(df2.select("sentence"))
  3. newDF.show()
  4. +--------+
  5. |sentence|
  6. +--------+
  7. | asf|
  8. | 2143|
  9. | f8934y|
  10. | asf|
  11. | 2143|
  12. | rfds|
  13. +--------+

 (4)并集+去重

  1. newDF = df.select("sentence")
  2. .union(df2.select("sentence")).distinct()
  3. newDF.show()
  4. +--------+
  5. |sentence|
  6. +--------+
  7. | rfds|
  8. | asf|
  9. | 2143|
  10. | f8934y|
  11. +--------+

UDF自定义函数

  1. # 创建用户自定义函数
  2. # UDF对表中的每一行进行函数处理,返回新的值
  3. udf(f=None, returnType=StringType)
  4. Parameters
  5. f python函数(如果用作独立函数)
  6. returnType 用户定义函数的返回类型。
  7. 示例如下:
  8. from pyspark.sql.functions import udf
  9. from pyspark.sql.types import IntegerType, StringType
  10. # 自定义函数1
  11. def to_upper(s):
  12. if s is not None:
  13. return s.upper()
  14. # 自定义函数2
  15. def add_one(x):
  16. if x is not None:
  17. return x + 1
  18. # 注册udf函数
  19. slen_udf = udf(lambda s: len(s), IntegerType())
  20. to_upper_udf = udf(to_upper, StringType())
  21. add_one_udf = udf(add_one, IntegerType())
  22. if __name__ == '__main__':
  23. df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
  24. # 法一:
  25. # df.select(slen("name").alias("slen(name)"),
  26. to_upper_udf(df["name"]),
  27. add_one_udf(df["age"])).show() # 与下面一句等价
  28. df.select(slen_udf("name").alias("slen(name)"),
  29. to_upper_udf("name"),
  30. add_one_udf("age")).show()
  31. # 法二:
  32. df = df.withColumn("slen(name)", slen_udf(df["name"]))
  33. df = df.withColumn("age", add_one_udf(df["age"]))
  34. df = df.withColumn("name", to_upper_udf("name"))
  35. df.show()

explode分割

  1. # 为给定数组或映射中的每个元素返回一个新行
  2. from pyspark.sql.functions import split, explode
  3. df = sc.parallelize([(1, 2, 3, 'a b c'),
  4. (4, 5, 6, 'd e f'),
  5. (7, 8, 9, 'g h i')])
  6. .toDF(['col1', 'col2', 'col3', 'col4'])
  7. df.withColumn('col4', explode(split('col4', ' '))).show()
  8. +----+----+----+----+
  9. |col1|col2|col3|col4|
  10. +----+----+----+----+
  11. | 1| 2| 3| a|
  12. | 1| 2| 3| b|
  13. | 1| 2| 3| c|
  14. | 4| 5| 6| d|
  15. | 4| 5| 6| e|
  16. | 4| 5| 6| f|
  17. | 7| 8| 9| g|
  18. | 7| 8| 9| h|
  19. | 7| 8| 9| i|
  20. +----+----+----+----+
  21. # 示例二
  22. from pyspark.sql import Row
  23. from pyspark.sql.functions import explode
  24. eDF = spark.createDataFrame([Row(
  25. a=1,
  26. intlist=[1, 2, 3],
  27. mapfield={ "a": "b"})])
  28. eDF.select(explode(eDF.intlist).alias("anInt")).show()
  29. +-----+
  30. |anInt|
  31. +-----+
  32. | 1|
  33. | 2|
  34. | 3|
  35. +-----+

DF和python变量互转

 在sparkSQL编程的时候,经常需要获取DataFrame的信息,然后python做其他的判断或计算,比如获取dataframe的行数以判断是否需要等待,获取dataframe的某一列或第一行信息以决定下一步的处理,等等。

  1. ## (1)获取第一行的值
  2. ```python
  3. # 获取第一行的值,返回普通python变量
  4. # 由于 first() 返回的是 Row 类型,可以看做是dict类型,
  5. # 在只有一列的情况下可以用 [0] 来获取值。
  6. value = df.select('columns_name').first()[0]

 (2)获取第一行的多个值

  1. #获取第一行的多个值,返回普通python变量
  2. # first() 返回的是 Row 类型,可以看做是dict类型,用 row.col_name 来获取值
  3. row = df.select('col_1', 'col_2').first()
  4. col_1_value = row.col_1
  5. col_2_value = row.col_2

 (3)获取一列/多列的所有值

  1. # 获取一列的所有值,或者多列的所有值
  2. # collect()函数将分布式的dataframe转成local类型的 list-row格式
  3. rows= df.select('col_1', 'col_2').collect()
  4. value = [[ row.col_1, row.col_2 ] for row in rows ]

不常用的一些

 (1)getField

  1. # 在StructField中通过名称获取字段。
  2. from pyspark.sql import Row
  3. df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
  4. df.select(df.r.getField("b")).show() # 用法与下面等价
  5. df.select(df.r.a).show()

 (2)isNotNull

  1. # 如果当前表达式不为空,则为true
  2. from pyspark.sql import Row
  3. df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])
  4. df.filter(df.height.isNotNull()).collect()
  5. # [Row(height=80, name='Tom')]

 (3)isNull

  1. # 如果当前表达式为空,则为true
  2. from pyspark.sql import Row
  3. df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])
  4. df.filter(df.height.isNull()).collect()
  5. # [Row(height=None, name='Alice')]

 (4)isin

  1. # 如果自变量的求值包含该表达式的值,则该表达式为true
  2. df[df.name.isin("Bob", "Mike")].collect()
  3. # [Row(age=5, name='Bob')]
  4. df[df.age.isin([1, 2, 3])].collect()
  5. # [Row(age=2, name='Alice')]

 (5)like

  1. # Column根据SQL LIKE匹配返回布尔值。
  2. df.filter(df.name.like('Al%')).collect()
  3. # [Row(age=2, name='Alice')]

 (6)otherwise

  1. otherwise(value)
  2. # 计算条件列表,并返回多个可能的结果表达式之一,如果otherwise()未调用,则为不匹配的条件返回None
  3. from pyspark.sql import functions as F
  4. df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
  5. +-----+-------------------------------------+
  6. | name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
  7. +-----+-------------------------------------+
  8. |Alice| 0|
  9. | Bob| 1|
  10. +-----+-------------------------------------+

 (7)when

  1. when(condition, value)
  2. Parameters:
  3. condition 布尔Column表达式
  4. value 文字值或Column表达式
  5. # 计算条件列表,并返回多个可能的结果表达式之一.如果otherwise()未调用,则为不匹配的条件返回None
  6. from pyspark.sql import functions as F
  7. >>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
  8. +-----+------------------------------------------------------------+
  9. | name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
  10. +-----+------------------------------------------------------------+
  11. |Alice| -1|
  12. | Bob| 1|
  13. +-----+------------------------------------------------------------+

后续需要整理学习:

PySpark SQL常用语法:https://www.jianshu.com/p/177cbcb1cb6f
PySpark︱DataFrame操作指南:增/删/改/查/合并/统计与数据处理:
https://blog.csdn.net/sinat\_26917383/article/details/80500349?depth\_1-utm\_source=distribute.pc\_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2&utm\_source=distribute.pc\_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2

参考文章:

pyspark官方文档:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html\#pyspark.sql.DataFrame
关于spark的博客集合:https://blog.csdn.net/qq\_32023541/column/info/19893
pyspark配置config:https://www.cnblogs.com/Tw1st-Fate/p/11094344.html

DataFrame基础:https://blog.csdn.net/suzyu12345/article/details/79673493
DataFrame:https://www.jianshu.com/p/cb0fec7a4f6d
列累积求和:https://blog.csdn.net/XnCSD/article/details/90676259
dataframe,排序并排名:https://blog.csdn.net/a1272899331/article/details/90268141

pyspark sql使用总结:https://blog.csdn.net/weixin\_44053979/article/details/89296224
pyspark 分组取前几个:https://blog.csdn.net/weixin\_40161254/article/details/88817225
Dataframe使用的坑 与 经历:https://cloud.tencent.com/developer/article/1435995
Pandas 和 PySpark 的 DataFrame 相互转换:http://fech.in/2018/pyspark\_and\_pandas/
读写dataframe:https://blog.csdn.net/suzyu12345/article/details/79673473\#31-%E5%86%99%E5%88%B0csv
DataFrame操作指南:增/删/改/查/合并/统计与数据处理:
https://blog.csdn.net/sinat\_26917383/article/details/80500349?depth\_1-utm\_source=distribute.pc\_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2&utm\_source=distribute.pc\_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2

发表评论

表情:
评论列表 (有 0 条评论,1627人围观)

还没有评论,来说两句吧...

相关阅读

    相关 笔记笔记

    未初始化的全局变量存储在BBS(未初始化全局变量存储区); 初始化的全局变量存储在全局变量存储区(data); 局部变量存储在栈 new malloc开辟的存储在堆 代

    相关 java笔记,Java笔记

    Java笔记 实例方法实例变量 实例方法 方法不带'static'关键字,为实例方法,实例方法必须先创建对象,通过“引用.”点方式调用。 注: ·带static的方法

    相关 笔记

    数据流图用来说明: 业务处理过程 系统边界内所包含的功能 系统中的数据流 流程图展示应用程序:从数据出入开始到获得输出为止的逻辑过程,描述处理过程的控制流

    相关 笔记

    一、.psd文件只能用PS打开,一般用拖拽到软件的方式; 二、文件保留:     文件格式:PSD 保留图层信息;需要专用的插件才能看到缩略图;         

    相关 笔记

    href=”javascript:void(0);”这个的含义是,让超链接去执行一个js函数,而不是去跳转到一个地址, 而void(0)表示一个空的方法,也就是不执行js函