【Spark九十七】RDD API之aggregateByKey

1. aggregateByKey的运行机制


   * Aggregate the values of each key, using given combine functions and a neutral "zero value".
   * This function can return a different result type, U, than the type of the values in this RDD,
   * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
   * as in scala.TraversableOnce. The former operation is used for merging values within a
   * partition, and the latter is used for merging values between partitions. To avoid memory
   * allocation, both of these functions are allowed to modify and return their first argument
   * instead of creating a new U.
  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)

    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

    combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)








2. aggregateByKey举例

2.1 求均值


val rdd = sc.textFile("气象数据")  
val rdd2 = rdd.map(x=>x.split(" ")).map(x => (x(0).substring("从年月日中提取年月"),x(1).toInt))  
val zeroValue = (0,0) 
val seqOp= (u:(Int, Int), v:Int) => {  
 (u._1 + v, u._2 + 1)  
val compOp= (c1:(Int,Int),c2:(Int,Int))=>{  
  (u1._1 + u2._1, u1._2 + u2._2)  
val vdd3 = vdd2.aggregateByKey(  
zeroValue ,  
rdd3.foreach(x=>println(x._1 + ": average tempreture is " + x._2._1/x._2._2)