当前位置:首页 > 开发 > 开源软件 > 正文

【spark八十七】给定Driver Program, 如何判断哪些代码在Driver运行,哪些代码在Worker上执行

发表于: 2015-04-02   作者:bit1129   来源:转载   浏览:
摘要: Driver Program是用户编写的提交给Spark集群执行的application,它包含两部分 作为驱动: Driver与Master、Worker协作完成application进程的启动、DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等。 计算逻辑本身,当计算任务在Worker执行时,执行计算逻辑完成application的计算任务

Driver Program是用户编写的提交给Spark集群执行的application,它包含两部分

  • 作为驱动: Driver与Master、Worker协作完成application进程的启动、DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等。
  • 计算逻辑本身,当计算任务在Worker执行时,执行计算逻辑完成application的计算任务

接下来的问题是,给定一个driver programming,哪些作为"驱动代码"在Driver进程中执行,哪些"任务逻辑代码”被包装到任务中,然后分发到计算节点进行计算?

 

1. 基本Spark driver application

package spark.examples.databricks.reference.apps.loganalysis

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._


object LogAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Log Analyzer in Scala").setMaster("local[3]")
    val sc = new SparkContext(sparkConf)


    //logFile path is provided by the program args



    val logFile = if (args != null && args.length == 1) args(0) else "E:\\softwareInstalled\\Apache2.2\\logs\\access.log"

    //transform each line of the logFile into ApacheAccessLog object,
    //RDD[T],T is of type ApacheAccessLog
    //Because it will be used more than more, so cache it.
    ////从数据源中读取文本文件内容,每一行转换为ApacheAccessLog,然后进行cache
    val accessLogs = sc.textFile(logFile).map(ApacheAccessLog.parseLogLine).cache()

    // Calculate statistics based on the content size.
    //Retrieve the contentSize column and cache it
    val contentSizes = accessLogs.map(log => log.contentSize).cache()
    ///reduce是个action,count是个action
    println("Content Size Avg: %s, Min: %s, Max: %s".format(
      contentSizes.reduce(_ + _) / contentSizes.count,
      contentSizes.min,
      contentSizes.max))

    // Compute Response Code to Count.
    //Take first 100 responseCode, no sort here
    //take操作是个action
    val responseCodeToCount = accessLogs
      .map(log => (log.responseCode, 1))
      .reduceByKey(_ + _)
      .take(100)
    println( s"""Response code counts: ${responseCodeToCount.mkString("[", ",", "]")}""")

    // Any IPAddress that has accessed the server more than 10 times.
    //take操作是个action
    val ipAddresses = accessLogs
      .map(log => (log.ipAddress, 1))
      .reduceByKey(_ + _)
      .filter(_._2 > 10) //Get the ipAddress that accessed the server 10+ times
      .map(_._1) //Map to the IP Address column
      .take(100)
    println( s"""IPAddresses > 10 times: ${ipAddresses.mkString("[", ",", "]")}""")

    // Top Endpoints.
    //top操作是个action
    val topEndpoints = accessLogs
      .map(log => (log.endpoint, 1))
      .reduceByKey(_ + _)
      .top(10)(OrderingUtils.SecondValueOrdering)
    println( s"""Top Endpoints: ${topEndpoints.mkString("[", ",", "]")}""")

    sc.stop()
  }
}

 

Spark application首先在Driver上开始运行main函数,执行过程中。 计算逻辑的开始是从读取数据源(比如HDFS中的文件)创建RDD开始,RDD分为transform和action两种操作,transform使用的是懒执行,而action操作将会触发Job的提交。

Job的提交以为着DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等,这是真正的将任务分发的开始。因此,从代码中首先要识别出哪些action。

另外一个问题, 任务执行过程中执行的逻辑代码如何识别?

Application在Driver上执行,遇到RDD的action动作后,开始提交作业,当作业执行完成后,后面的作业陆续提交,也就是说,虽然一个Spark Application可以有多个Job(每个action对应一个Job),这些Job是顺序执行的,Job(X)执行完成才会执行Job(X+1)。

 

遇到Job执行,比如上面的contentSize.reduce(_+_),那么所谓的计算逻辑就是函数 _+_,这是个求和操作。具体的任务逻辑执行时,还是从action回溯RDD,中间经过转换得到最终的这个Task所属的Partition的数据,然后执行reduce(_+_)

 

2. Spark Stream程序

 

package spark.examples.streaming

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._


//No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?


object SparkStreamingForPartition {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetCatWordCount")
    conf.setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val dstream = ssc.socketTextStream("192.168.26.140", 9999)
    //foreachRDD是DStream的动作函数,会触发Job执行,然后对一个时间间隔内创建的RDD进行处理。如果RDD执行RDD的动作函数,是否继续触发Job执行?
    dstream.foreachRDD(rdd => {
      //embedded function
      def func(records: Iterator[String]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://192.168.26.140:3306/person";
          val user = "root";
          val password = ""
          conn = DriverManager.getConnection(url, user, password)
          records.flatMap(_.split(" ")).foreach(word => {
            val sql = "insert into TBL_WORDS(word) values (?)";
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, word)
            stmt.executeUpdate();
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }
      ///对RDD进行重新分区,以改变处理的并行度
      val repartitionedRDD = rdd.repartition(3)
      ///对每个分区调用func函数,func函数的参数就是一个分区对应的数据的遍历器(Iterator)
      repartitionedRDD.foreachPartition(func)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

DStream的foreachRDD是个Output Operation,类似于RDD的action,因此,高阶函数foreachRDD的函数参数,将在worker上执行。这里的func是定义为foreachRDD参数函数的内部函数,因此会发送到Worker上执行,如果func定义在最外层,比如作为main函数的直接内部函数,是否可以顺利的从Driver序列化到Worker上呢?我认为是可以的。函数在Scala中只是一个普通的对象,没有状态,序列化反序列化时需要创建MySQL的Connection。

 

3. foreachRDD中的rdd继续执行action算子

 

package spark.examples.streaming

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object SparkStreamingForPartition2 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NetCatWordCount")
    conf.setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val dstream = ssc.socketTextStream("192.168.26.140", 9999)
    dstream.foreachRDD(rdd => {
      //对于空RDD调用flatMap报错,这里进行判断
      if (!rdd.isEmpty()) {
        ///RDD的Record个数
        val recordCount = rdd.count()
        ///RDD中的单词数
        val wordCount = rdd.flatMap(_.split(" ")).map(word => 1).reduce(_ + _)
        println("recordCount: =" + recordCount + "," + "wordCount:=" + wordCount)
      } else {
        println("Empty RDD, No Data")
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

 需要注意的是,对于没有任何元素的RDD(RDD.isEmpty为true),那么不能执行转换算子如flatMap等,这在Spark Streaming中读取空的RDD是很常见的情况?但是这个空检查是在RDD的reduce函数中执行的

 

  def reduce(f: (T, T) => T): T = {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

 

如下操作对于空RDD是可以的,返回0

  rdd.flatMap(_.split(" ")).map(_ => 1).count()

 

 

 

【spark八十七】给定Driver Program, 如何判断哪些代码在Driver运行,哪些代码在Worker上执行

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
想在室内营造一点小氛围不,话说工程师工作繁忙也得自己造些情调不是。这我是在pcDuino上做的,接一
在Spark Standalone集群模式下,Driver运行在客户端,所谓的客户端就是提交代码的那台机器。在Stand
在Spark Standalone集群模式下,Driver运行在客户端,所谓的客户端就是提交代码的那台机器。在Stand
如何查看一个运行的exe执行程序需要有哪些DLL动态链接库 第一步: 打开“360安全卫士”软件 笔者的3
Program for Linux USB-devices driver 开始啃硬骨头~ 这里我打算一步步给出USB device driver 的de
打开IDEA 在src下的main下的scala下右击创建一个scala类 名字为SimpleApp ,内容如下 import org.apa
打开IDEA 在src下的main下的scala下右击创建一个scala类 名字为SimpleApp ,内容如下 import org.apa
打开IDEA 在src下的main下的scala下右击创建一个scala类 名字为SimpleApp ,内容如下 import org.apa
打开IDEA 在src下的main下的scala下右击创建一个scala类 名字为SimpleApp ,内容如下 import org.apa
  Mongodb 是采用 Scons 来构建的。Scons是一个Python写的自动化构建工具,从构建这个角度说,它
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号