Spark 算子详解,常用算子汇总

从大方向来说,Spark 算子大致可以分为以下两类:
	1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。
	2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。
从小方向来说,Spark 算子大致可以分为以下三类:
	1)Value数据类型的Transformation算子  
	2)Key-Value数据类型的Transfromation算子
	3)Action算子

Map 类算子
1、map、flatMap、mapValues、flatMapValues 底层都是调用了MapPartitionsRDD 算子

参考文献:
  MapPartitionsRDD 参数 preservesPartitioning 的原因  减少shuffle 次数 (是否继承父类分区 true  or false)
// 1、map算子
// 2、flatMap算子
// 同基本转换操作中的map,只不过mapValues/flatMapValues是针对[K,V]中的V值进行map/flatMap操作。
// 3、mapValues算子
val rdd3=sc.parallelize(List((1,2),(3,4)))
rdd3.mapValues(_.to(5)).collect().foreach(println(_))
结果:
(1,Range(2, 3, 4, 5))
(3,Range(4, 5))

// 4、flatMapValues算子
val rdd3=sc.parallelize(List((1,2),(3,4)))
rdd3.flatMapValues(_.to(5)).collect().foreach(println(_))
结果:
(1,2)
(1,3)
(1,4)
(1,5)
(3,4)
(3,5)

2、glom算子

 => 作用:将同一个分区里的元素合并到一个array里
// glom算子
val listArray = Array[String]("admin 1","user0 2","user1 3","user2 4")
val listRdd: RDD[String] = session.sparkContext.makeRDD(listArray)
val glomRDD: RDD[Array[String]] = listRdd.glom()
结果:
RDD(Array(admin 1, user0 2), Array(user1 3, user2 4))

3、groupByKey 算子

groupByKey 底层是调用了 combineByKeyWithClassTag 函数,从源码中我们可以看到 如果 self.partitioner == Some(partitioner),
那么 将不会调用 ShuffledRDD 发生shuffle

Shuffle 类算子
1、subtract 算子

	/* subtract相当于进行集合的差操作 */
	val listArray: Array[Int] = Array[Int](5,6,7,8)
    val listRdd: RDD[Int] = session.sparkContext.makeRDD(listArray,3)

    val listArray1: Array[Int] = Array[Int](4,6,7,9)
    val listRddSecond: RDD[Int] = session.sparkContext.makeRDD(listArray1,3)

    val value: RDD[Int] = listRdd.subtract(listRddSecond)
    // 根据分区输出 RDD 中的数据
    value.partitions.foreach (elem => {
      val index: Int = elem.index
      val me = value.mapPartitionsWithIndex ((pid, iter) => {
        if (index == pid) iter else Iterator ()
      })
      println ("分区索引ID : " + index + ",elem : " + me.collect ().toList)
    })
    输出结果:
    分区索引ID : 0,elem : List()
	分区索引ID : 1,elem : List()
	分区索引ID : 2,elem : List(5, 8)

2、sample 算子

/*
	sample算子是用来抽样用的
	withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复
	fraction :抽出多少,这是一个double类型的参数,0-1之间,eg:0.3表示抽出30%
	seed:表示一个种子,根据这个seed随机抽取,一般情况下只用前两个参数就可以,
	那么这个参数作用是什么呢?
	这个参数一般用于调试,有时候不知道是程序出问题还是数据出了问题,就可以将这个参数设置为定值
*/
	val listArray: Array[Int] = Array[Int](5,6,7,8)
    val listRdd: RDD[Int] = session.sparkContext.makeRDD(listArray,3)
    val value: RDD[Int] = listRdd.sample(true,0.5)
    println(value.take(3))
    结果:
    Array(5, 8, 2)
/*
	takeSample()函数和上面的sample函数是一个原理,但是不使用相对比例采样,
	而是按设定的采样个数进行采样,
	同时返回结果不再是RDD,而是相当于对采样后的数据进行Collect(),返回结果的集合为单机的数组。
*/
    val value: Array[Int] = listRdd.takeSample(true,3)
    println(value)
    结果:
    Array(5, 6, 2)

3、Cogroup 算子

/*
	cogroup函数将两个RDD进行协同划分,cogroup函数的定义如下。
  cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
  对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器。
  (K, (Iterable[V], Iterable[W]))
  其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元组。
*/
	val listArray: Array[(String,Int)] = Array[(String,Int)](("1",1),("2",2),("7",7),("4",4),("4",4))
    val listRddFir: RDD[(String,Int)] = session.sparkContext.makeRDD(listArray,3)

    val listArray1: Array[(String,Int)] = Array[(String,Int)](("1",1),("2",2),("6",6),("4",4))
    val listRddSec: RDD[(String,Int)] = session.sparkContext.makeRDD(listArray1,3)
    val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = listRddFir.cogroup(listRddSec)
    // 根据分区输出 RDD 中的数据
    value.partitions.foreach (elem => {
      val index: Int = elem.index
      val me = value.mapPartitionsWithIndex ((pid, iter) => {
        if (index == pid) iter else Iterator ()
      })
      println ("分区索引ID : " + index + ",elem : " + me.collect ().toList)
    })
    结果:
    分区索引ID : 0,elem : List((6,(CompactBuffer(),CompactBuffer(6))))
	分区索引ID : 1,elem : List((4,(CompactBuffer(4, 4),CompactBuffer(4))), (7,(CompactBuffer(7),CompactBuffer())), (1,(CompactBuffer(1),CompactBuffer(1))))
	分区索引ID : 2,elem : List((2,(CompactBuffer(2),CompactBuffer(2))))
	
	val value: RDD[(String, Iterable[Int])] = value1.map(line=>{(line._1,line._2._1++line._2._2)})
	结果:
	分区索引ID : 0,elem : List((6,List(6)))
	分区索引ID : 1,elem : List((4,List(4, 4, 4)), (7,List(7)), (1,List(1, 1)))
	分区索引ID : 2,elem : List((2,List(2, 2)))

Action 类算子
1、collectAsMap 算子

/*
collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。 对于重复K的RDD元素,后面的元素覆盖前面的元素。
*/
	val listArray: Array[(String, Int)] = Array[(String, Int)](("1", 1), ("2", 2), ("7", 7), ("4", 4), ("4", 4))
    val listRddFir: RDD[(String, Int)] = session.sparkContext.makeRDD(listArray, 3)
    val stringToInt: collection.Map[String, Int] = listRddFir.collectAsMap()
    println(stringToInt)
    结果:
    Map(2 -> 2, 7 -> 7, 1 -> 1, 4 -> 4)

2、reduceByKeyLocally 算子

/*
实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。
*/
val listArray: Array[(String, Int)] = Array[(String, Int)](("1", 1), ("2", 2), ("7", 7), ("4", 4), ("4", 4))
    val listRddFir: RDD[(String, Int)] = session.sparkContext.makeRDD(listArray, 3)
    val stringToInt: collection.Map[String, Int] = listRddFir.reduceByKeyLocally(_ + _)
    println(stringToInt)
    结果:
    Map(1 -> 1, 2 -> 2, 4 -> 8, 7 -> 7)

3、lookup 算子

/*
lookup(key:K):Seq[V]
Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。 
这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K,V)形成的Seq。 
如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素。
*/
	val listArray: Array[(String, Int)] = Array[(String, Int)](("1", 1), ("2", 2), ("7", 7), ("4", 4), ("4", 4))
	val ints: Seq[Int] = listRddFir.lookup("4")
    println(ints)
    结果:
    WrappedArray(4, 4)

4、foldByKey 算子

/*
fold和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第一个元素是zeroValue。
*/
	val tuple: (String, Int) = listRddFir.fold(("0", 0))((A, B) => (A._1 + "@" + B._1, A._2 + B._2))
    println(tuple)
    结果:
    (0@0@2@7@0@1@0@4@4,18)
    
	val function: RDD[(String, Int)] = listRddFir.foldByKey(6)(_+_)
    println(function.collectAsMap())
    结果:
    Map(2 -> 8, 7 -> 13, 1 -> 7, 4 -> 14)

5、

/*
aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操作。
aggreagate与fold和reduce的不同之处在于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。 
而在fold和reduce函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。
函数的定义如下。
aggregate[B](z: B)(seqop: (B,A) => B,combop: (B,B) => B): B
由以上可以看到,
(zeroValue: U)是给定一个初值,后半部分有两个函数,seqOp与combOp。
seqOp相当于是在各个分区里进行的聚合操作,它支持(U,T)=>U,也就是支持不同类型的聚合。
combOp是将seqOp后的结果再进行聚合,此时的结果全部是U类,只能进行同构聚合。
*/
	val listArray: Array[(String, Int)] = Array[(String, Int)](("1", 1), ("2", 2), ("7", 7), ("4", 4), ("4", 4))
    val listRddFir: RDD[(String, Int)] = session.sparkContext.makeRDD(listArray, 3)
	val function: Int = listRddFir.aggregate(1)((A, B)=>B._2, (A, B)=>A+B)
    println(function)
    结果:
    13

你可能感兴趣的