Spark2.x精通:Standalone模式Master节点启动源码剖析

微信公众号:大数据开发运维架构

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

如果您觉得“大数据开发运维架构”对你有帮助,欢迎转发朋友圈

从微信公众号拷贝过来,格式有些错乱,建议直接去公众号阅读


对于Spark生产环境部署模式通常是Spark Standalone或Spark On Yarn,这里我们跟踪下源码,分析Standalone模式下Master节点的启动流程,已经相关的初始化流程:

源码版本:

    Spark2.2.0

1.Master启动脚本为spark-master.sh:

/home/cuadmin/ljs/spark2.2.1/sbin/start-master.sh

脚本里面主要干了几件事:

1).指定Master加载主函数为:

28 CLASS="org.apache.spark.deploy.master.Master"

2).调用配置文件加载脚本:

42 . "${SPARK_HOME}/sbin/spark-config.sh"

43

44 . "${SPARK_HOME}/bin/load-spark-env.sh"

3).指定SPARK_MASTER_HOST、SPARK_MASTER_HOST、SPARK_MASTER_HOST三个参数默认值:

46 if [ "$SPARK_MASTER_PORT" = "" ]; then

47  SPARK_MASTER_PORT=7077

48 fi

49

50 if [ "$SPARK_MASTER_HOST" = "" ]; then

51  case `uname` in

52      (SunOS)

53          SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`"

54          ;;

55      (*)

56          SPARK_MASTER_HOST="`hostname -f`"

57          ;;

58  esac

59 fi

60

61 if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then

62  SPARK_MASTER_WEBUI_PORT=8080

63 fi

4).最后调用命令spark-daemon.sh start启动Master:

65 "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \

66  --host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \

67  $ORIGINAL_ARGS

2.下面我们去看下spark-daemon.sh脚本到底干了啥事:

1).前面一部分制定了日志LOG和进程PID文件:

118 log="$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out"

119 pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid"

2).    后面的代码就一行比较重要,这里启动调用的代码178行:

176  case "$mode" in

177    (class)

178      execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "$@"

179      ;;

180

181    (submit)

182      execute_command nice -n "$SPARK_NICENESS" bash "${SPARK_HOME}"/bin/spark-submit --class "$command" "$@"

183      ;;

184

185    (*)

把参数加进去大体就是执行了这么一行东西:

nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.master.Master "$@" >> "$log" 2>&1 < /dev/null &

其实还是调用的org.apache.spark.deploy.master.Master这个主类,我们到源码找到对应的main函数:

  def main(argStrings: Array[String]) {

  //log的初始化

    Utils.initDaemon(log)

    //加载默认配置

    val conf = new SparkConf

    //加载命令行参数

    val args = new MasterArguments(argStrings, conf)

  //这里才是重点,启动master进程

    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)

    rpcEnv.awaitTermination()

  }

下面是MasterArguments()函数,这里主要解析命令行传递的参数,覆盖默认参数,比如默认端口7077;

    MasterArguments类在new一个对象的时候会先读取spark-env.sh中的配置信息,然后再读取spark-defaults.conf中的数据,然后在获取通过shell传入的参数,如果有重复的值,那么后面的会覆盖前面的值。所以spark中配置信息获取的优先性为:传入参数 > spark-defaults.conf > spark-env.sh构建Master的参数。

我这只粘贴比较重要的代码:

private[master] class MasterArguments(args: Array[String], conf: SparkConf) extends Logging {

  var host = Utils.localHostName()

  var port = 7077

  var webUiPort = 8080

  var propertiesFile: String = null

  // Check for settings in environment variables

  if (System.getenv("SPARK_MASTER_IP") != null) {

    logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_HOST")

    host = System.getenv("SPARK_MASTER_IP")

  }

  if (System.getenv("SPARK_MASTER_HOST") != null) {

    host = System.getenv("SPARK_MASTER_HOST")

  }

  if (System.getenv("SPARK_MASTER_PORT") != null) {

    port = System.getenv("SPARK_MASTER_PORT").toInt

  }

  if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {

    webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt

  }

  parse(args.toList)

  // This mutates the SparkConf, so all accesses to it must be made after this line

  propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

  if (conf.contains("spark.master.ui.port")) {

    webUiPort = conf.get("spark.master.ui.port").toInt

  }

  @tailrec

  private def parse(args: List[String]): Unit = args match {

    case ("--ip" | "-i") :: value :: tail =>

    ......

    case ("--host" | "-h") :: value :: tail =>

    ......

    case ("--port" | "-p") :: IntParam(value) :: tail =>

    ......

    case "--webui-port" :: IntParam(value) :: tail =>

    ......

    case ("--properties-file") :: value :: tail =>

    ......

    case ("--help") :: tail =>

    ......

    case Nil => // No-op

    ......

    case _ =>

      printUsageAndExit(1)

  }

3.startRpcEnvAndEndpoint()才是重点,这里启动Master进程,这里重要剖析,函数体很短就几行代码,主要干了三件事:

    1).创建RPC的server端 RpcEnv

    2).创建RPC的client端 - Endpoint 与master这个client端进行通信

    3).向 Master 的通信终端发法请求,获取 BoundPortsResponse 对象

/**

  *  启动Master返回三个值 RpcEnv  webUI端口和Rest服务端口

  * Start the Master and return a three tuple of:

  *  (1) The Master RpcEnv

  *  (2) The web UI bound port

  *  (3) The REST server bound port, if any

  */

  def startRpcEnvAndEndpoint(

      host: String,

      port: Int,

      webUiPort: Int,

      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {

      //是否启用安全加密,由参数 spark.authenticate.secret控制,默认为false不启用

    val securityMgr = new SecurityManager(conf)

    //创建RPC的server端

    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)

    //创建RPC的client端 - Endpoint 与master这个client端进行通信

    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,

      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

    val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)

    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)

  }

      这里主要创建了rpcEnv、masterEndpoint,设计到Spark核心的RPC通信只是这里补充一点知识:

    RpcEnv、RpcEndPoint、RpcEndpointRef为核心的新型架构下的RPC通信方式,在底层封装了Akka和Netty,为未来扩充更多的通信系统提供了可能。RpcEnv是一个更宏观的Env,是Spark集群Rpc通信的基础服务环境,因此在集群启动时候所有的节点(无论Master还是Worker)都会创建一个RpcEnv,然后将该节点注册到RpcEnv中。RpcEnv是RPC的环境,所有的RpcEndpoint都需要注册到RpcEnv实例对象中,管理着这些注册的RpcEndpoint的生命周期。

    上面 RpcEnv.create()函数最后调用通过 NettyRpcEnvFactory 创建出一个 RPC Environment ,其具体类是 NettyRpcEnv

def create(config: RpcEnvConfig): RpcEnv = {

    val sparkConf = config.conf

    // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support

    // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance

    val javaSerializerInstance =

      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]

    val nettyEnv =

      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,

        config.securityManager)

    if (!config.clientMode) {

      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>

        nettyEnv.startServer(config.bindAddress, actualPort)

        (nettyEnv, nettyEnv.address.port)

      }

      try {

        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1

      } catch {

        case NonFatal(e) =>

          nettyEnv.shutdown()

          throw e

      }

    }

    nettyEnv

  }

第10行config.clientMode这里回去判断是否为是否为服务端,Master启动是服务端clientMode=false,Worker启动是客户端clientMode=true;虽然服务端和客户端都会通过这个方法创建一个 NettyRpcEnv ,但区别就在这里了是否启动服务。如果是服务端就调用第12行这个函数

nettyEnv.startServer(config.bindAddress, actualPort),函数体代码:

def startServer(bindAddress: String, port: Int): Unit = {

    val bootstraps: java.util.List[TransportServerBootstrap] =

      if (securityManager.isAuthenticationEnabled()) {

        java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))

      } else {

        java.util.Collections.emptyList()

      }

    server = transportContext.createServer(bindAddress, port, bootstraps)

    //在每个 RpcEndpoint 注册的时候都会注册一个默认的 RpcEndpointVerifier,它的作用是客户端调用的时候先用它来询问 Endpoint 是否存在。

    dispatcher.registerRpcEndpoint(

      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))

  }

    主要的功能是创建 RPCEnv ,即 NettyRpcEnv 。客户端不执行。这个函数里面主要干了两个事情:

1).创建 NettyRpcEnv

2).Dispatcher 注册 RpcEndpoint。 Dispatcher 的主要作用是保存注册的RpcEndpoint、分发相应的Message到RpcEndPoint中进行处理。Dispatcher 即是上图中 ThreadPool的角色。它同时也维系一个 threadpool,用来处理每次接受到的 InboxMessage 。而这里处理 InboxMessage 是通过 inbox 实现的。

    第16行Utils.startServiceOnPort()主要是去绑定Server需要的端口,一般我们代码本地调试的时候日志会先去绑定4040,绑定不成功再去绑定4041,直到达到最大重试次数,就是这个函数打印的日志。

4.startRpcEnvAndEndpoint()创建完RpcEnv后,

    会执行rpcEnv.setupEndpoint(),创建Master对应的RpcEndpoint,并将其注册到RpcEnv中。

val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,

      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

5.最后startRpcEnvAndEndpoint()调用masterEndpoint.askSync()函数,

向 Master 的通信终端发送请求,获取 BoundPortsResponse 对象,

BoundPortsResponse 是一个样例类包含三个属性: rpcEndpointPort webUIPort restPort。

val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)

    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)

至此startRpcEnvAndEndpoint()函数执行完成。

6.由于Master继承了ThreadSafeRpcEndpoint,重写里面的onStart方法,在启动的时候会执行onStart方法,他主要做了一下几件事:

1).构建web ui 和 启动rest server

2).守护线程启动一个调度机制,定期检查Worker是否超时

3).进行Master HA相关的操作

源代码如下:

override def onStart(): Unit = {

    logInfo("Starting Spark master at " + masterUrl)

    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")

    //构建webUI

    webUi = new MasterWebUI(this, webUiPort)

    webUi.bind()

    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort

    //是否启用反向代理

    if (reverseProxy) {

      masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)

      logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +

      s"Applications UIs are available at $masterWebUiUrl")

    }

    //启动一个定时调度机制,为检查是否有超时的Worker,发送CheckForWorkerTimeOut消息,

    // receive 方法中对 CheckForWorkerTimeOut 进行处理

    // 由case CheckForWorkerTimeOut接收消息,调用函数 timeOutDeadWorkers()处理

    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {

      override def run(): Unit = Utils.tryLogNonFatalError {

        self.send(CheckForWorkerTimeOut)

      }

    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

    //判断是启动rest server

    if (restServerEnabled) {

      val port = conf.getInt("spark.master.rest.port", 6066)

      restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))

    }

    restServerBoundPort = restServer.map(_.start())

    masterMetricsSystem.registerSource(masterSource)

    masterMetricsSystem.start()

    applicationMetricsSystem.start()

    // Attach the master and app metrics servlet handler to the web ui after the metrics systems are

    // started.

    masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

    applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

    val serializer = new JavaSerializer(conf)

    //根据参数spark.deploy.recoveryMode配置的值,调用系统恢复函数处理

    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {

      case "ZOOKEEPER" =>

        logInfo("Persisting recovery state to ZooKeeper")

        val zkFactory =

          new ZooKeeperRecoveryModeFactory(conf, serializer)

        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))

      case "FILESYSTEM" =>

        val fsFactory =

          new FileSystemRecoveryModeFactory(conf, serializer)

        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))

      case "CUSTOM" =>

        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))

        val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])

          .newInstance(conf, serializer)

          .asInstanceOf[StandaloneRecoveryModeFactory]

        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))

      case _ =>

        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))

    }

    persistenceEngine = persistenceEngine_

    leaderElectionAgent = leaderElectionAgent_

  }

    最后Master会启动receive方法,会不断的执行,用于接收actor发送过来的请求,消息循环处理机制我会在下一篇文章单独讲解,谢谢关注!!!

你可能感兴趣的