spark源码分析and结构拆解(1)——消息通信原理!!

消息通信原理主要分为三大部分
-------------------------------------------1.spark消息通信架构----------------------------------------------------
 

    spark在各个模块中间例如Master,Worker这些东西,一般都是使用Rpc的静态方法创建RpcEnv实例,然后实例化master,由于master继承了ThreadSafeRpcEndpoint,这个创建的就是master实例的安全的终端点,接着调用了RpcEnv启动这个终端点,然后把这个终端点和其对应的引用注册到RpcEnv里面,所以在之后的消息通信里面其他想和master通信的直接获取终端点的引用就ok了,其他也是这样....

spark源码分析and结构拆解(1)——消息通信原理!!_第1张图片

-------------------------------------------2.spark的启动消息通信---------------------------------------------------
    spark的消息启动主要是master和worker之间的通信,消息发送如下图,一个步
    一,worker向master发送注册信息(RegisterWorker),等待master处理结束!
当master和worker启动之后,worker会自动建立通信环境RpcEnv和终端点EndPoint,并且会向master发送注册信息(register),这里有点需要注意就是我们使用HA模式的话master可能会有多个这个时候就需要在worker里面建立一个注册的线程池,那需要申请注册的请求放在这个线程池里面,然后直接启动线程池来进行注册,这样很方便,这个注册过程就是获取Master的终端点的引用,调用registterWithMaster方法,根据终端点的send方法发送注册RegisterWork消息然后就是下一步
    二,返回注册成功还是失败
master接收到消息之后会首先对信息进行验证,如果成功会发送信息告诉成功然后worker会定期发送心跳,如果失败也会发送然后worker会把错误信息打印,在masetr里面接收到了注册信息之后会首先判断自己是不是处于standby模式,如果是则忽略,然后会在注册表里面查询是否由当前worker编号,如果有就会报错。都判断完成之后使用registerWorker方法吧worke加入到注册列表里,用于预备处理的数据
    三,worker会定时发送心跳给master

当worker接收到成功信息之后会定时发送心跳hearbeat给master,方便master了解Worker状态。(注意timeout的四分之一才是心跳间隔哦)

spark源码分析and结构拆解(1)——消息通信原理!!_第2张图片

--------------------------------------3.spark运行时的消息通信---------------------------------------------------

    这个过程一共有八步,应用程序SparkContext向mater发送注册信息,并有master该应用分配Exectour,exectour启动之后会向SparkContext发送注册成功消息(这里的注册时exectour前面注册时app,并不一样)然SparkContext的rdd触发Action之后会形成一个DAG,通过DAGScheduler进行划分Stage并将其转化成TaskSet,然后TaskScheduler向Executor发送执行消息,Executor接收到信息之后启动并且运行,最后是由Driver处理结果,详解如下图:

spark源码分析and结构拆解(1)——消息通信原理!!_第3张图片

    一,执行SparkContext的时候会实例化一个Schedulerbackend对象,在这个对象启动中会继承父类DriverEndpoint和创建AppClient的ClientEndpoint这两个终端点,在ClientEntpoint里面创建线程池主动向master发送注册请求,当master接收到注册请求时候会在registerApplication方法中记录应用信息并把它们加入到等待运行的列表里面,然后就是注册成功并且会发一个成功的消息给ClientEndpoint同时调用startExecutorOnworker方法来运行,然后将LaunchExecutor给worker,让workeer启动executor(有三点默认使用fifo然后找出剩余内存大于等于启动executor需要的内存核数大于1的worker)

 

    二,AppClient。ClienntEndpoint接收到master发送的RegisterApplication消息时需要把注册表示registered设置为turn    ,然后master发现注册线程获取状态发生变化后完成Applcation

 

    三,在master类的startExecutorOnWorker方法中配置运行程序时调用allocateWorkerResourceToExecuutor方法来实现在worker中启动Executor。这个时候worker会收到来自Master发送的LaunchExecutor消息,然后在worker上实例化Executor对象,然后这个对象创建进程生成器processBuilder.command创建CGEB对象,这个对象就是execuyor的容器,最后worker发送消息给master说都创建完毕。

 

    四,master接收worker发送的ExecutorStateChanged消息,根据ExecuteState。然后向Driver发送ExecutorUpdated消息

 

  五,在步骤三里面CGEB启动方法onstart中,会发送注册Exceutor消息到RegisterExecutor给DirverEndpoint,DirverEndpoint的终端点会判断这个Executor是否存在,如果失败就发送RegisterExecutorfailed否则这个diver就会记录这个executor信息,发送RegisterExecutor,然后在makeoffers()方法分配运行任务最后发送lunchTesk消息执行任务

 

    六,当CGEB接收到execuotor注册成功消息时候在这个容器里面实例化Executor对象,这个Executor对象也会向Driver发送心跳(10s),而且等待接收从DriverEndpoint接收的执行任务,

 

    七,上面的DriverEndpoint终端点发送的lunchTask执行任务消息,任务的执行时Exceutor的lunchtesk实现的,在执行的时候会创建TaskRuner进程,又该进程进行任务处理处理结束之后发送statusupdate到CGEB

 

    八,在Exceutor的lunchtask方法执行完成之后,会向DirverEndpoint终端点发送状态变更statusUpdate消息,接收到的时候对结果进行处理
 

你可能感兴趣的