两种类型的算子:transformation和actio

练习

package day07

import org.apache.spark.{SparkConf, SparkContext}

object SparkRDDTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkRDDTest").setMaster("local")

    val sc = new SparkContext(conf)

    // 通过并行化生成rdd
    val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))

    // 对rdd1里的每一个元素乘2然后排序
//    val res1 = rdd1.map(_ * 2).sortBy(x => x,true)
//    println(res1.collect().toBuffer)
    // 过滤出大于等于10的元素
//    val res2 = res1.filter(_ >= 10)

    // 将元素以数组的方式打印出来
//    println(res2.collect().toBuffer)

    val rdd2 = sc.parallelize(Array("a b c","d e f","h i j"))
    // 将rdd2里面的每一个元素先切分再压平
//    val res = rdd2.flatMap(_.split(' '))
//    println(res.collect.toBuffer)

    // 来个复杂的,
    val rdd3 = sc.parallelize(List(List("a b c","a b b"),List("e f g","a f g"),List("h i j","a a b")))
    // 将rdd3里面的每一个元素先切分再压平
//    val res = rdd3.flatMap(_.flatMap(_.split(" ")))
//    println(res.collect().toBuffer)

    val rdd4 = sc.parallelize(List(5,6,4,3))
    val rdd5 = sc.parallelize(List(1,2,3,4))
    // 求并集
    val unionres = rdd4 union rdd5
//    println(res.collect().toBuffer)
    // 求交集
//    println(rdd4.intersection(rdd5).collect().toBuffer)
    // 去重
//    println(unionres.distinct().collect().toBuffer)

    val rdd6 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
    val rdd7 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))
    // 求join
//    println((rdd6 join rdd7).collect().toBuffer)

    // 求左连接和右连接
//    val res1 = rdd6.leftOuterJoin(rdd7)
//    val res2 = rdd6.rightOuterJoin(rdd7)
//    println(res1.collect().toBuffer)
    // 求并集
//    val res = rdd6 union(rdd7)

    // 按key进行分组
//    println(res.groupByKey().collect().toBuffer)

    // 分别用groupByKey和reduceByKey实现单词计数,注意groupByKey与reduceByKey的区别
    // groupByKey
//    println(res.groupByKey().mapValues(_.sum).collect().toBuffer)
    // reduceByKey
//    println(res.reduceByKey(_ + _).collect.toBuffer)

    val rdd8 = sc.parallelize(List(("tom",1),("tom",2),("jerry",3),("kitty",2)))
    val rdd9 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))
    // cogroup  注意cogroup与groupByKey的区别
//    println(rdd8.cogroup(rdd9).collect().toBuffer)

    val rdd10 = sc.parallelize(List(1,2,3,4,5))
    // reduce聚合
//    println(rdd10.reduce(_+_))

    val rdd11 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2),("shuke",1)))
    val rdd12 = sc.parallelize(List(("jerry",2),("tom",3),("shuke",2),("kitty",5)))
    val rdd13 = rdd11.union(rdd12)
    // 按key进行聚合
//    reduceByKey


    // 按value的降序排序
    val res = rdd13.reduceByKey(_+_).map(t => (t._2,t._1)).sortByKey(false).map(t => (t._2,t._1))
    println(res.collect.toBuffer)

    // 笛卡尔积
//    println(rdd11.cartesian(rdd12).collect.toBuffer)

    // 其他:count、top、take、first、takeOrdered
    


  }
}

你可能感兴趣的