当前位置:首页 > 开发 > 开源软件 > 正文

【Spark二六】Spark代码剖析

发表于: 2015-01-21   作者:bit1129   来源:转载   浏览次数:
摘要: SparkEnv初始化的角色     org.apache.spark.shuffle.sort.SortShuffleManager ///shuffleManager org.apache.spark.MapOutputTrackerMaster org.apache.spark.shuffle.ShuffleMemoryManager org.ap

SparkEnv初始化的角色

 

 

	org.apache.spark.shuffle.sort.SortShuffleManager   ///shuffleManager
	org.apache.spark.MapOutputTrackerMaster
	org.apache.spark.shuffle.ShuffleMemoryManager
	org.apache.spark.network.netty.NettyBlockTransferService
	org.apache.spark.MapOutputTrackerMaster@25e45d
	org.apache.spark.serializer.JavaSerializer@dc42ab   ///closureSeirializer, serializer
	org.apache.spark.storage.BlockManager@16d5aa8
	org.apache.spark.storage.BlockManagerMaster@a62840
	org.apache.spark.network.netty.NettyBlockTransferService@148d5b2   //blockTransferService
	org.apache.spark.CacheManager@1ac9928
	org.apache.spark.HttpFileServer@131d67
	org.apache.spark.metrics.MetricsSystem@516ac3
	org.apache.spark.MapOutputTrackerMaster@25e45d
	org.apache.spark.broadcast.BroadcastManager@f8008d
	C:\Users\hadoop\AppData\Local\Temp\spark-7f0f46d9-28d0-4e8d-94d0-9a8f8f589d14   //sparkFilesDir


    new SparkEnv(
      executorId,
      actorSystem,
      serializer,
      closureSerializer,
      cacheManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockTransferService,
      blockManager,
      securityManager,
      httpFileServer,
      sparkFilesDir,
      metricsSystem,
      shuffleMemoryManager,
      conf)
  }

 

分析的源代码:

 

package spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

object SparkWordCount {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile("file:///D:/word.in")
    println(rdd1.toDebugString)
    val rdd2 = rdd.flatMap(_.split(" "))
    println("rdd2:" + rdd2.toDebugString)
    val rdd3 = rdd2.map((_, 1))
    println("rdd3:" + rdd3.toDebugString)
    val rdd4 = rdd4.reduceByKey(_ + _);
    println("rdd4:" + rdd4.toDebugString)
    rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    sc.stop
  }
}

 

 输出的RDD依赖图是:

RDD1

rdd1:(1) file:///D:/word.in MappedRDD[1] at textFile at SparkWordCount.scala:15 []
 |  file:///D:/word.in HadoopRDD[0] at textFile at SparkWordCount.scala:15 []

RDD2

rdd2:(1) FlatMappedRDD[2] at flatMap at SparkWordCount.scala:17 []
 |  file:///D:/word.in MappedRDD[1] at textFile at SparkWordCount.scala:15 []
 |  file:///D:/word.in HadoopRDD[0] at textFile at SparkWordCount.scala:15 []

RDD3

rdd3:(1) MappedRDD[3] at map at SparkWordCount.scala:19 []
 |  FlatMappedRDD[2] at flatMap at SparkWordCount.scala:17 []
 |  file:///D:/word.in MappedRDD[1] at textFile at SparkWordCount.scala:15 []
 |  file:///D:/word.in HadoopRDD[0] at textFile at SparkWordCount.scala:15 []

 

rdd4:(1) ShuffledRDD[4] at reduceByKey at SparkWordCount.scala:21 []
 +-(1) MappedRDD[3] at map at SparkWordCount.scala:19 []
    |  FlatMappedRDD[2] at flatMap at SparkWordCount.scala:17 []
    |  file:///D:/word.in MappedRDD[1] at textFile at SparkWordCount.scala:15 []
    |  file:///D:/word.in HadoopRDD[0] at textFile at SparkWordCount.scala:15 []
 

 ResultTask的runTask方法里的func方法调用PairRDDFunctions里的writeToFile函数完成写结果操作(saveAsTextFile)

 

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context)) //调用PairRDDFunctions里的writeToFile函数完成写结果操作(saveAsTextFile)
  }

 

 

PairRDDFunctions里的writeToFile函数完成写结果操作(saveAsTextFile)

 val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
      val config = wrappedConf.value
      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
      // around by taking a mod. We expect that no task will be attempted 2 billion times.
      val attemptNumber = (context.attemptId % Int.MaxValue).toInt

      val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)

      writer.setup(context.stageId, context.partitionId, attemptNumber)
      writer.open()
      try {
        var recordsWritten = 0L
        while (iter.hasNext) {
          val record = iter.next()
          writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])

          // Update bytes written metric every few records
          maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
          recordsWritten += 1
        }
      } finally {
        writer.close()
      }
      writer.commit()
      bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
    }
 

 

 

 

 

【Spark二六】Spark代码剖析

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
以WordCount为例,最简单情况的Shuffle过程为例,展示Spark Shuffle的读写过程, WordCount代码: p
以WordCount为例,最简单情况的Shuffle过程为例,展示Spark Shuffle的读写过程, WordCount代码: p
以WordCount为例,最简单情况的Shuffle过程为例,展示Spark Shuffle的读写过程, WordCount代码: p
在前面几篇博客里,介绍了Spark的伪分布式安装,以及使用Spark Shell进行交互式操作,本篇博客主要
在前面几篇博客里,介绍了Spark的伪分布式安装,以及使用Spark Shell进行交互式操作,本篇博客主要
Profile的选择: 1. hadoop-2.4和YARN勾选,复选框中显示黑色的对号 2. hbase-hadoop1,maven-3,scal
Profile的选择: 1. hadoop-2.4和YARN勾选,复选框中显示黑色的对号 2. hbase-hadoop1,maven-3,scal
Profile的选择: 1. hadoop-2.4和YARN勾选,复选框中显示黑色的对号 2. hbase-hadoop1,maven-3,scal
概述 一个Spark的Job分为多个stage,最后一个stage会包含一个或多个ResultTask,前面的stages会包含
Spark SQL允许相关的查询如SQL,HiveQL或Scala运行在spark上。其核心组件是一个新的RDD:SchemaRDD
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号