Spark操作——转换操作(一) 2023-03-13 15:27 142阅读 0赞 1. **基础转换操作** 2. **键值转换操作** ### 基础转换操作 ### * map\[U\](f:(T)=>U):RDD\[U\] 对RDD中的每个元素都应用一个指定的函数,以此产生一个新的RDD scala> var rdd = sc.textFile("/Users/lyf/Desktop/test/data1.txt") rdd: org.apache.spark.rdd.RDD[String] = /Users/lyf/Desktop/test/data1.txt MapPartitionsRDD[13] at textFile at <console>:24 scala> rdd.map(line => line.split(" ")).collect res16: Array[Array[String]] = Array(Array(Hello, World), Array(Hello, Tom), Array(Hello, Jerry)) * distince():RDD\[(T)\] 去除RDD中重复的元素,返回所有元素不重复的RDD scala> var rdd = sc.parallelize(List(1,2,2,3,3,3,4,5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24 scala> rdd.distinct.collect res18: Array[Int] = Array(4, 1, 5, 2, 3) * distince(numPartions: Int):RDD\[T\] scala> var rdd = sc.parallelize(List(1,2,2,3,3,3,4,5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24 scala> var rddDistinct = rdd.distinct rddDistinct: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at distinct at <console>:25 scala> rddDistinct.partitions.size res21: Int = 4 scala> var rddDistinct = rdd.distinct(3) rddDistinct: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at distinct at <console>:25 scala> rddDistinct.partitions.size res22: Int = 3 * flatMap\[U\](f:(T)=>TraversableOnce\[U\]):RDD\[U\] scala> var rdd = sc.textFile("/Users/lyf/Desktop/test/data1.txt") rdd: org.apache.spark.rdd.RDD[String] = /Users/lyf/Desktop/test/data1.txt MapPartitionsRDD[32] at textFile at <console>:24 scala> rdd.flatMap(line => line.split(" ")).collect res23: Array[String] = Array(Hello, World, Hello, Tom, Hello, Jerry) * coalesce(numPartitions:Int, shuffle:Boolean=false):RDD\[T\] * repartition(numPartitions:Int):RDD\[T\] 两者都是对RDD进行重新分区。coalesce使用HashPartitioner进行分区,第一个参数为重分区数,第二个为是否进行shuffle,默认为false。repartition是coalesce操作shuffle为true的封装。 scala> var rdd = sc.textFile("/Users/lyf/Desktop/test/data1.txt") rdd: org.apache.spark.rdd.RDD[String] = /Users/lyf/Desktop/test/data1.txt MapPartitionsRDD[35] at textFile at <console>:24 scala> rdd.partitions.size res24: Int = 2 scala> var rdd_1 = rdd.coalesce(1) rdd_1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[36] at coalesce at <console>:25 // 如果分区数大于原来的分区数,则第二个参数必须要true,否则分区数不变 scala> var rdd_2 = rdd.coalesce(3) rdd_2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[37] at coalesce at <console>:25 scala> rdd_2.partitions.size res26: Int = 2 scala> var rdd_2 = rdd.coalesce(5, true) res37: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4, 5), Array(6, 7), Array(8, 9, 10)) rdd_2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at coalesce at <console>:25 scala> rdd_2.partitions.size res28: Int = 5 scala> var rdd_3 = rdd.repartition(5) rdd_3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[46] at repartition at <console>:25 scala> rdd_3.partitions.size res29: Int = 5 * randomSplit(weights: Array\[Double\], seed: Long=Utils.random.nextLong):Array\[RDD\[T\]\] 根据weights权重将一个RDD分割为多个RDD,组成RDD数组,权重越高,被划分到的概率就越大。 scala> var rdd = sc.parallelize(1 to 10, 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at parallelize at <console>:24 // 将原RDD按照weights权重生成一个新的RDD数组 scala> var rddSplit = rdd.randomSplit(Array(1.0, 2.0, 3.0, 4.0)) rddSplit: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[48] at randomSplit at <console>:25, MapPartitionsRDD[49] at randomSplit at <console>:25, MapPartitionsRDD[50] at randomSplit at <console>:25, MapPartitionsRDD[51] at randomSplit at <console>:25) scala> rddSplit.size res30: Int = 4 scala> rddSplit(0).collect res31: Array[Int] = Array() scala> rddSplit(1).collect res32: Array[Int] = Array(3, 8) scala> rddSplit(2).collect res33: Array[Int] = Array(1, 2, 9) scala> rddSplit(3).collect res34: Array[Int] = Array(4, 5, 6, 7, 10) * glom():RDD\[Array\[T\]\] 将RDD中每一个分区中所有类型为T的数据转变为元素类型为T的数组\[Array\[T\]\] scala> var rdd = sc.parallelize(1 to 10, 4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[52] at parallelize at <console>:24 scala> rdd.collect res36: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> rdd.glom().collect res37: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4, 5), Array(6, 7), Array(8, 9, 10)) * union(other: RDD\[T\]): RDD\[T\] 返回两个RDD的并集,元素不进行去重 scala> var rdd1 = sc.makeRDD(1 to 3, 1) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at makeRDD at <console>:24 scala> var rdd2 = sc.makeRDD(2 to 5, 1) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at makeRDD at <console>:24 scala> rdd1.union(rdd2).collect res38: Array[Int] = Array(1, 2, 3, 2, 3, 4, 5) * intersection(other: RDD\[T\]): RDD\[T\] * intersection(other: RDD\[T\], numPartitions:Int):RDD\[T\] * intersection(other: RDD\[T\], partitioner: Partitioner): RDD\[T\] 返回两个RDD的交集,元素不进行不去重。参数numPartitions指定分区数,参数partitioner指定分区函数 scala> rdd1.intersection(rdd2).collect res39: Array[Int] = Array(3, 2) * subtract(other: RDD\[T\]): RDD\[T\] * subtract(other: RDD\[T\], numPartitions:Int): RDD\[T\] * subtract(other: RDD\[T\], p: Partitioner): RDD\[T\] 返回两个RDD的差集,元素不进行去重 scala> rdd1.subtract(rdd2).collect res40: Array[Int] = Array(1) 参考: \[1\] 郭景瞻. 图解Spark:核心技术与案例实战\[M\]. 北京:电子工业出版社, 2017.
相关 Spark操作——转换操作(一) 1. 基础转换操作 2. 键值转换操作 基础转换操作 map\[U\](f:(T)=>U):RDD\[U\] 对RDD中的每个元素都应用一个指定的函数,以此 朱雀/ 2023年03月13日 15:27/ 0 赞/ 143 阅读
相关 Python操作spark 本文的内容参考[Spark编程基础(Python版) 厦门大学 林子雨][Spark_Python_ _] 在学习下面之前最好先理解Python原生的map函数和reduc 灰太狼/ 2022年12月27日 04:47/ 0 赞/ 59 阅读
相关 Flink之DataSet转换操作(一) 目录 (1)Map详解 (2)FlatMap详解 (3)Map优化之MapPartition详解 (4)Filter 逃离我推掉我的手/ 2022年11月06日 05:44/ 0 赞/ 65 阅读
相关 Spark ML特征提取、转换和选择操作详解 一、特征的提取 1、TF-IDF(词频-逆向文档频率) TF(词频):HashingTF与CountVectorizer用于生成词频TF向量。Hashing 港控/mmm°/ 2022年10月30日 12:28/ 0 赞/ 77 阅读
相关 Spark基本操作 概念 Transformation: 根据已有RDD创建新的RDD数据集build (1)map(func):对调用map的RDD数据集中的每个element都使 小鱼儿/ 2022年10月01日 06:42/ 0 赞/ 40 阅读
相关 Spark—聚合操作—combineByKey 聚合操作——combineByKey 当数据集一键值对形式组织的时候,聚合具有相同键的元素进行一些统计是很常见的操作。对于Pair RDD常见的聚合操作如:reduceB 蔚落/ 2022年07月27日 13:43/ 0 赞/ 114 阅读
相关 Spark——RDD操作详解 一、基本RDD 1、针对各个元素的转化操作 最常用的转化操作是map()和filter()。转化操作map()J接收一个函数,把这个函数用于RDD中的每一个元素,将函数 水深无声/ 2022年07月14日 00:30/ 0 赞/ 125 阅读
相关 spark操作列表 Action 操作 1、 collect() ,返回值是一个数组,返回dataframe集合所有的行 2、 collectAsList() 返回值是一个java类型的数 不念不忘少年蓝@/ 2022年02月12日 12:21/ 0 赞/ 150 阅读
相关 Spark:常用transformation 转换操作 及action 行动操作 一、常用transformation介绍 <table> <thead> <tr> <th>操作</th> <th>介绍</th> </t 痛定思痛。/ 2021年09月25日 03:32/ 0 赞/ 308 阅读
相关 Spark算子:RDD键值转换操作–combineByKey、foldByKey 关键字:Spark算子、Spark RDD键值转换、combineByKey、foldByKey combineByKey def combineByKey\[C\]( 我不是女神ヾ/ 2021年09月25日 01:24/ 0 赞/ 256 阅读
还没有评论,来说两句吧...