spark笔记(二)之RDD常用算子

大家好!下面是我在疫情假期期间学习的spark算子笔记,刚刚用了一下午的时间把它整理出来分享给大家!码字实属不易如果对你有帮助,记得点赞呦!

文章目录

  • 一.spark行动算子
  • 二.spark单value类型
  • 三.spark双value类型
  • 四.spark算子KV类型

一.spark行动算子

1.reduce( f: (T, T) => T ):通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。

val list1: RDD[Int] = sc.makeRDD(1 to 10)
val reduceRDD: Int = list1.reduce(_+_)
println(reduceRDD)  //55

2.collect():在驱动程序中,以数组的形式返回数据集的所有元素。

val list1: RDD[Int] = sc.parallelize(List(1,2,3,4,5))
list1.collect().foreach(println)
返回值:Array(1,2,3,4,5)

3.count():返回RDD中元素的个数。

val list1: RDD[Int] = sc.parallelize(List(1,2,3,4,5))
val countRDD: Long = list1.count()
println(countRDD)     // 5

4.first():返回RDD 中的第一个元素。

val list1: RDD[Int] = sc.parallelize(List(1,2,3,4,5))
val firstRDD: Int = list1.first()
println(firstRDD)     // 1

5.take(n:Int):返回一个由RDD的前n个元素组成的数组。

val list1: RDD[Int] = sc.makeRDD(List(7,2,5,6,4,3))
val takeRDD: Array[Int] = list1.take(3)
takeRDD.foreach(println)   //7 2 5

6.takeOrdered(n:Int):返回该RDD排序后的前n个元素组成的数组。

val list1: RDD[Int] = sc.makeRDD(List(7,2,5,6,4,3))
val takeOrderedRDD: Array[Int] = list3.takeOrdered(3)
takeOrderedRDD.foreach(println)   //2 3 4

7. aggregate(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U):先进行每个分区内的计算,然后再进行分区与分区之间的计算。
注意:aggregateByKey在运算的过程中,分区内会把初始值加上,分区间不加!!!
aggregate在运算的过程中,分区内会把初始值加上,分区间也加!!!

val list1: RDD[Int] = sc.makeRDD(1 to 10,2)
val aggregateRDD: Int = list1.aggregate(0)(_+_,_+_)
println(aggregateRDD)   //55
val aggregateRDD1: Int = list1.aggregate(10)(_+_,_+_)
println(aggregateRDD1)    //85  两个分区内每个分区+10,区间再+10

8.fold():aggregate的简化版。aggregate分区内和分区间是相同算法的时候可以使用fold()操作。

val list1: RDD[Int] = sc.makeRDD(1 to 10,2)
val foldRDD: Int = list1.fold(0)(_+_)
println(firstRDD)   //55

9.saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统三者之间区别:保存的数据格式不同。

val list1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
list1.saveAsTextFile("test1")
list1.saveAsSequenceFile("test2")
list1.saveAsObjectFile("test3")

10.foreach():对数组进行遍历,我平时用的是Scala中的foreach(),而这个foreach()是Spark中的。
区别:Scala中的foreach()需要在Executor中执行,而Spark中的foreach()直接在Drvier中执行

二.spark单value类型

1.map( f: A => B ):返回新的RDD。按照传入函数逻辑进行数据转换。

val test: RDD[Int] = sc.makeRDD(1 to 10)
val mapRDD: RDD[Int] = test.map(_*2)      //map被调用了10次
mapRDD.collect().foreach(println)
返回值:Array(2,4,6,8,10,12,14,16,18,20)

2.mapPartitions( f: Iterator[T] => Iterator[U] ):类是于map,但它独立对每一个分区数据进行处理。

有几个分区就调用几次mapPartitions,假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。

val test: RDD[Int] = sc.makeRDD(1 to 10,2)
//map被调用了10次,mapPartitions被调用了2次
val mapPartitionsRDD: RDD[Int] = test.mapPartitions(_.map(_*2)) 
mapPartitionsRDD.collect().foreach(println)
返回值:Array(2,4,6,8,10,12,14,16,18,20)

map()和mapPartition()的区别:

  • map:每次处理一条数据。
  • mapPartition():每次处理一个分区的数据。不足的是这个分区处理完,原RDD中分区的数据才能释放。可能导致OOM(数据溢出)。当内存空间较大的时候建议使用mapPartition(),以提高处理效率。

3.mapPartitionsWithIndex( f: Iterator[T] => Iterator[U] ):类是于mapPartitions,但它多了一个分区的索引值。

val test: RDD[Int] = sc.makeRDD(1 to 8,2)
val mapPartitionsIndex: RDD[(Int, String)] = test.mapPartitionsWithIndex {
  case (num, x) => {
    x.map((_,"分区"+num))
  }
}
mapPartitionsIndex.collect().foreach(println)
返回值:Array((1,分区0),(2,分区0),(3,分区0),(4,分区0),(5,分区1),(6,分区1),(7,分区1),(8,分区1))

4.flatMap(f: T => TraversableOnce[U]):扁平化类似于map。但是传入函数返回应该是一个集合而不是单一的元素。

val test2: RDD[List[Int]] = sc.makeRDD(Array(List(1,2,3),List(4,5,6)))  
val flatmapRDD: RDD[Int] = test2.flatMap(datas => datas)   //接收一个集合,返回一个集合
flatmapRDD.collect().foreach(println)
返回值:Array(1,2,3,4,5,6)

5.glom():将每一个分区形成一个数组,并且形成新的RDD类型。

val test: RDD[Int] = sc.makeRDD(1 to 10,2)
val glomRDD: RDD[Array[Int]] = test1.glom()
//取每个分区的最大值    
glomRDD.collect().foreach( x => {
   println(x.max)
})
返回值:Array(5,10)

6.groupBy( f : T => K ):对数据进行分组。按照传入函数返回值进行分组,分组数据为元组k-v,k表示索引值,v表示分组数据集合

val list: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val groupbyRDD: RDD[(Int, Iterable[Int])] = list.groupBy( x => x%2)  //按照2的整数倍进行分组
groupbyRDD.collect().foreach(println)
返回值:Array((0,CompactBuffer(2, 4),(1,CompactBuffer(1, 3))

7.filter( f: T => Boolean ):对数据进行过滤。满足的留下,不满足的过滤掉。

val list: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val filterRDD: RDD[Int] = list.filter( x => x%2==0)   //按照2的整数倍进行过滤
filterRDD.collect().foreach(println)
返回值:Array(2,4)

8.sample(witdReplacement,fraction,seed):对指定数据进行随机抽样。
witdReplacement:表示抽出的数据是否放回,true放回,false不放回。
fraction:指定的随机种子随机抽样出的数量。(0-1)之间
seed:用于指定随机数生成器种子

val list: RDD[Int] = sc.makeRDD(1 to 10)
val sampleRDD: RDD[Int] = list.sample(false,0.5,1)    
sampleRDD.collect().foreach(println)
返回值:Array(1,5,6,7,8,10)

9.distinct([numPartitions]:Int):对数据进行去重操作,将结果放入几个分区中。numPartitions表示分区的数量,默认为当前分区数。

val list: RDD[Int] = sc.makeRDD(List(1,2,2,3,4,3,3,2,9,10),3)    //默认三个分区
val distinctRDD: RDD[Int] = list.distinct(2)
distinctRDD.collect().foreach(println)
返回值:Array(4,10,2,1,3,9)

10.coalesce(numPartitions:Int,shuffle : Boolean = false):缩减分区,用于大数据集过滤后,提高小数据集的执行效率。

numPartitions:分区数
shuffle:数据是否进行打乱重组,默认为false true为打乱重组缩减分区数,提高执行效率

val list: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)
val coalesceRDD: RDD[Int] = list.coalesce(2)
println(coalesceRDD.partitions.size)   //返回值为2

11.repartition(numPartitions:Int):对数据重新分区,重新洗牌所有数据。numPartitions表示分区数。

val list: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)
val repartitionRDD: RDD[Int] = list.repartition(2)
println(repartitionRDD.partitions.size)    //返回值为2

coalesce和repartition的区别:

  • coalesce:重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
  • repartition:实际上是调用的coalesce,默认是进行shuffle的。源码如下
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}

12.sortBy( f: (T) => K,ascending:Boolean = true ):对数据进行排序。按照函数返回值不同的规则进行排序

f:高阶函数
ascending:默认为顺序true false:倒序

val list: RDD[Int] = sc.makeRDD(List(2,3,5,1,6,7))
val sortByRDD: RDD[Int] = list.sortBy( x => x)
sortByRDD.collect().foreach(println)
返回值:Array(1,2,3,5,6,7)

三.spark双value类型

1.union():对源RDD和参数RDD求并集后返回一个新的RDD。

val list1: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))
val list2: RDD[Int] = sc.makeRDD(List(6,7,8,9,10))
val unionRDD: RDD[Int] = list1.union(list2)
unionRDD.collect().foreach(println)
返回值:Array(1,2,3,4,5,6,7,8,9)

2.subtract():取差集,去除两个RDD中的相同的元素,不同的将保留。

val list1: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))
val list2: RDD[Int] = sc.makeRDD(List(4,5,6,7,8))
val subtractRDD: RDD[Int] = list1.subtract(list2)
subtractRDD.collect().foreach(println)  
返回值:Array(1,2,3)	   // 4和5重复 则显示1 2 3  (只显示本数组)

3.intersection():取交集,去除两个RDD中的不相同的元素,相同的将保留。

val list1: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))
val list2: RDD[Int] = sc.makeRDD(List(4,5,6,7,8))
val intersectionRDD: RDD[Int] = list1.intersection(list2)
intersectionRDD.collect().foreach(println)
返回值:Array(4,5)

4.zip():拉链,两个RDD中的元素进行K-V组合,数据必须一一对应 不然出错每个分区的数据也要一一对应不然出错

val list1: RDD[Int] = sc.parallelize(Array(1,2,3))
val list2: RDD[String] = sc.parallelize(Array("a","b","c"))
val zipRDD: RDD[(Int, String)] = list1.zip(list2)
zipRDD.collect().foreach(println)
返回值:Array((1,a),(2,b),(3,c))

四.spark算子KV类型

1.partitionBy(partitioner: Partitioner):对RDD进行分区 默认分区规则是取余分区。Partitioner可以直接传一个 new HashPartitioner( numPartitions:Int ),详情请查看源码PairRDDFunctions.scala。也可自定义分区器

val list1: RDD[(String, Int)] = sc.makeRDD(List(("aaa",1),("bbb",2),("bbb",3)))
val partitionByRDD: RDD[(String, Int)] = list1.partitionBy(new HashPartitioner(2))  //2个 分区
println(partitionByRDD.partitions.size)    //分区数量为2  分别为0 和 1

2.groupByKey():对每个key进行操作,把相同的key的value放到一个集合当中

val list1: RDD[String] = sc.makeRDD(List("A","A","B","A","C","B","C"))
val groupByKeyRDD: RDD[(String, Iterable[Int])] = list1.map( x => (x,1)).groupByKey()
groupByKeyRDD.collect().foreach(println)
返回值:Array((A,CompactBuffer(1, 1, 1)),(B,CompactBuffer(1, 1)),(C,CompactBuffer(1, 1)))

3.reduceByKey(func, [numTasks]):将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

  • func:高阶函数
  • [numTasks]:value 进行两两相加
val list1: RDD[String] = sc.makeRDD(List("A","A","B","A","C","B","C"))
val mapRDD: RDD[(String, Int)] = list1.map(x => (x,1))
val reduceByKey: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
reduceByKey.collect().foreach(println)
返回值:Array((A,3),(B,2),(C,2))

4.countByKey():表示每一个key对应的元素个数。

val list1: RDD[(Int, Int)] = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
val countByKeyRDD: collection.Map[Int, Long] = list1.countByKey()
countByKeyRDD.foreach(println)   
返回值:Array((3,2),(1,3),(2,1))

5.aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U) :函数的柯里化先进行每个分区内的计算,然后再进行分区与分区之间的计算。

  • zeroValue:初始值;
  • seqOp:函数用于在每一个分区中用初始值逐步迭代value;
  • combOp:函数用于合并每个分区中的结果;
    注意:aggregateByKey在运算的过程中,分区内会把初始值加上,分区间不加!!!
    aggregate在运算的过程中,分区内会把初始值加上,分区间也加!!!
val list1: RDD[(String, Int)] = sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
val aggregateByKeyRDD: RDD[(String, Int)] = list1.aggregateByKey(0)(math.max(_,_),_+_)   //取每个分区不同key的最大值,每个分区的key最大值进行相加
aggregateByKeyRDD.collect().foreach(println)    
返回值:Array((b,3),(a,3),(c,12))

6.foldByKey():是aggregateByKey的简化版

val list1: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",1),("c",1),("b",1),("c",1),("c",1)),2)
val foldByKeyRDD: RDD[(String, Int)] = list1.foldByKey(0)(_+_)
foldByKeyRDD.collect().foreach(println) 
返回值:Array((b,1),(a,2),(c,3))

7.sortByKey([ascending], [numTasks]):返回一个按照key进行排序的(K,V)的RDD,就是拿key的值进行排序。

  • [ascending]:true为升序 false为降序 默认是升序
  • [numTasks]:指定分区数
val list1: RDD[(String, Int)] = sc.makeRDD(List(("A" -> 1),("B" -> 2),("C" -> 1),("A" -> 2)))
val sortByKeyRDD: RDD[(String, Int)] = list1.sortByKey()
sortByKeyRDD.collect().foreach(println)
返回值:Array((A,1),(A,2),(B,2),(C,1))

8.mapValues():对每个V进行操作。

val list1: RDD[(Int, String)] = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
val mapValuesRDD: RDD[(Int, String)] = list1.mapValues(_+"*")   //把每个value+"*"
mapValuesRDD.collect().foreach(println)
返回值:Array((1,a*),(1,d*),(2,b*),(3,c*))

9.join(otherDataset, [numTasks]):将两个RDD数据进行相同key的关联

  • otherDataset:关联的另一个RDD
  • [numTasks]:分区数
val list1: RDD[(String, Int)] = sc.makeRDD(List(("A",1),("B",2),("C",3)))
val list2: RDD[(String, String)] = sc.makeRDD(List(("A","a"),("B","b"),("C","c")))
val joinRDD: RDD[(String, (Int, String))] = list1.join(list2)
joinRDD.collect().foreach(println)     
返回值:Array((A,(1,a)),(B,(2,b)),(C,(3,c)))

10.cogroup(otherDataset, [numTasks]):其功能和join类似

val list1: RDD[(String, Int)] = sc.makeRDD(List(("A",1),("B",2),("C",3)))
val list2: RDD[(String, String)] = sc.makeRDD(List(("A","a"),("B","b"),("C","c")))
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = list1.cogroup(list2)
cogroupRDD.collect().foreach(println)     
返回值:Array((A,(CompactBuffer(1),CompactBuffer(a))),(B,(CompactBuffer(2),CompactBuffer(b))),(C,(CompactBuffer(3),CompactBuffer(c))))

你可能感兴趣的