当前位置:首页 > 开发 > 开源软件 > 正文

【Spark八十】Spark RDD API二

发表于: 2015-02-26   作者:bit1129   来源:转载   浏览:
摘要: coGroup package spark.examples.rddapi import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ object CoGroupTest_05 { def main(args: Array[String]) { v

coGroup

package spark.examples.rddapi

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

object CoGroupTest_05 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (11, "F"), (81, "Y"), (77, "Z"), (31, "X")), 3)
    val z2 = sc.parallelize(List((4, "ABC"), (6, "B2"), (7, "Z2"), (7, "Z3"), (91, "E"), (11, "FF"), (88, "N"), (77, "S"), (36, "M")), 4)
    //隐式函数,定义于PairRDDFunctions
    //结果由两个(至多四个)RDD的Key组成,(Key,(ValuesOfRDD1Seq, ValuesOfRDD2Seq, ValuesOfRDD3Seq))
    //cogroup [W]( other : RDD [(K, W)]): RDD [(K, (Seq [V], Seq [W]))]
    //cogroup [W1 , W2 ]( other1 : RDD [(K, W1)], other2 : RDD [(K, W2)]): RDD [(K , (Seq[V], Seq[W1], Seq[W2 ]))]
    val r = z1.cogroup(z2)
    r.collect.foreach(println)
    /*Result:,
(4,(CompactBuffer(),CompactBuffer(ABC)))
(36,(CompactBuffer(),CompactBuffer(M)))
(88,(CompactBuffer(),CompactBuffer(N)))
(81,(CompactBuffer(Y),CompactBuffer()))
(77,(CompactBuffer(Z),CompactBuffer(S)))
(9,(CompactBuffer(E),CompactBuffer()))
(6,(CompactBuffer(B1),CompactBuffer(B2)))
(11,(CompactBuffer(F),CompactBuffer(FF)))
(3,(CompactBuffer(A),CompactBuffer()))
(7,(CompactBuffer(Z1),CompactBuffer(Z2, Z3)))
(91,(CompactBuffer(),CompactBuffer(E)))
(31,(CompactBuffer(X),CompactBuffer()))

     */
  }
}

 

groupBy

package spark.examples.rddapi

import org.apache.spark.{Partitioner, SparkContext, SparkConf}

object GroupByTest_06 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)
    /**
     * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
     * mapping to that key. The ordering of elements within each group is not guaranteed, and
     * may even differ each time the resulting RDD is evaluated.
     *
     * Note: This operation may be very expensive. If you are grouping in order to perform an
     * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
     * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
     */
    //  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =  groupBy[K](f, defaultPartitioner(this))
    //根据指定的函数进行分组,分组得到的集合的元素类型是(K,V),K是分组函数的返回值,V是组内元素列表
    val r = z1.groupBy(x => if (x._1 % 2 == 0) "even" else "odd")
    r.collect().foreach(println)
    //结果:
    /*
    (even,CompactBuffer((6,B1)))
   (odd,CompactBuffer((3,A), (7,Z1), (9,E), (7,F), (9,Y), (77,Z), (31,X)))
     */

    //Partitioner是HashPartitioner
    val r2 = z1.groupBy(_._1 % 2)
    r2.collect().foreach(println)
    //结果:
    /*
    (0,CompactBuffer((6,B1)))
    (1,CompactBuffer((3,A), (7,Z1), (9,E), (7,F), (9,Y), (77,Z), (31,X)))
    */

    class MyPartitioner extends Partitioner {
      override def numPartitions = 3

      def getPartition(key: Any): Int = {
        key match {
          case null => 0
          case key: Int => key % numPartitions
          case _ => key.hashCode % numPartitions
        }
      }

      override def equals(other: Any): Boolean = {
        other match {
          case h: MyPartitioner => true
          case _ => false
        }
      }
    }
    println("=======================GroupBy with Partitioner====================")
    //分组的同时进行分区;分区的key是分组函数的计算结果?
    val r3 = z1.groupBy((x:(Int, String)) => x._1, new MyPartitioner())
    r3.collect().foreach(println)
    /*
    //6,3,9一个分区,7,31一个分区,77一个分区
    (6,CompactBuffer((6,B1)))
    (3,CompactBuffer((3,A)))
    (9,CompactBuffer((9,E), (9,Y)))
    (7,CompactBuffer((7,Z1), (7,F)))
    (31,CompactBuffer((31,X)))
    (77,CompactBuffer((77,Z)))
    */

  }


}

 

 collect

 

 

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

object CollectTest_07 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)

    /**
     * Return an array that contains all of the elements in this RDD.
     */
    //这是一个行动算子
    z1.collect().foreach(println)

    /**
     * Return an RDD that contains all matching values by applying `f`.
     */
    //    def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = {
    //      filter(f.isDefinedAt).map(f)
    //    }

//    val f  = {
//      case x: (Int, String) => x
//    }
//    val z2 = z1.collect(f)
//    println(z2)
  }
}

 

RDD有个toArray方法,已经不推荐使用了,推荐使用collect方法

 

 

 

 

 

 

 

 

 

 

【Spark八十】Spark RDD API二

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
什么是RDD Spark是围绕着RDD(Resilient Distributed Dataset,弹性分布式数据集)建立起来的,也就是
什么是RDD Spark是围绕着RDD(Resilient Distributed Dataset,弹性分布式数据集)建立起来的,也就是
本文对Sogou的日志进行分析,Sogou日志下载地址. http://download.labs.sogou.com/dl/sogoulabdown/
本文对Sogou的日志进行分析,Sogou日志下载地址. http://download.labs.sogou.com/dl/sogoulabdown/
1、什么是RDD? 上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的inpu
1、什么是RDD? 上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的inpu
1、什么是RDD? 上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的inpu
1、什么是RDD? 上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的inpu
1背景介绍 现今分布式计算框架像MapReduce和Dryad都提供了高层次的原语,使用户不用操心任务分发和
本文目的 最近在使用Spark进行数据清理的相关工作,初次使用Spark时,遇到了一些挑(da)战(ken)
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号