当前位置:首页 > 开发 > 开源软件 > 正文

【Spark102】Spark存储模块BlockManager剖析

发表于: 2015-06-06   作者:bit1129   来源:转载   浏览:
摘要: Spark围绕着BlockManager构建了存储模块,包括RDD,Shuffle,Broadcast的存储都使用了BlockManager。而BlockManager在实现上是一个针对每个应用的Master/Executor结构,即Driver上BlockManager充当了Master角色,而各个Slave上(具体到应用范围,就是Executor)的BlockManager充当了Slave角色

Spark围绕着BlockManager构建了存储模块,包括RDD,Shuffle,Broadcast的存储都使用了BlockManager。而BlockManager在实现上是一个针对每个应用的Master/Executor结构,即Driver上BlockManager充当了Master角色,而各个Slave上(具体到应用范围,就是Executor)的BlockManager充当了Slave角色。

因此,BlockManager在Driver和应用的各个Executor之间各有一份,Driver上的BlockManager不具备实际存储的能力,它记录了各个Executor的BlockManager的状态(通过查看BlockManagerMaster和BlockManagerMasterActor的源代码,BlockManagerMaster和BlockManagerMasterActor并没有持有一个BlockManager对象,那么每个Executor BlockManager的状态存储在什么地方?通过查看BlockManager的类注释,发现BlockManager确实运行在Driver上)。Master BlockManager和ExecutorBlockManager之间的通信也是基于Akka,消息格式定义于BlockManagerMessages类中。

 

上面的描述并不准确,事实上在Driver端,同Executor一样,各有一个BlockManager。除此之外,Driver上还有一个BlockManager Master,它的实现类是BlockManagerManager,因此,对于BlockManager而言,Driver既是Master也是Slave

 

0.BlockManager类注释:

 

/**
 * Manager running on every node (driver and executors) which provides interfaces for putting and
 * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
 *
 * Note that #initialize() must be called before the BlockManager is usable.
 */
private[spark] class BlockManager(

 

 

1. Master/Slave的通信内容

Master BlockManager向Executor BlockManager可以发送的消息包括:

 

 sealed trait ToBlockManagerSlave

  // Remove a block from the slaves that have it. This can only be used to remove
  // blocks that the master knows about.
  case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave

  // Remove all blocks belonging to a specific RDD.
  case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave

  // Remove all blocks belonging to a specific shuffle.
  case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave

  // Remove all blocks belonging to a specific broadcast.
  case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
    extends ToBlockManagerSlave

 

Executor BlockManager向Master BlockManager可以发送的消息包括:

 

 

 sealed trait ToBlockManagerMaster

  case class RegisterBlockManager(
      blockManagerId: BlockManagerId,
      maxMemSize: Long,
      sender: ActorRef)
    extends ToBlockManagerMaster

  //获取某个Block在哪些Executor的BlockManager上  
  case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster

  //获取一组Block在哪些Executor的BlockManager上
  case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
 
  case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

  case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster

  //删除Executor
  case class RemoveExecutor(execId: String) extends ToBlockManagerMaster

  
  case object StopBlockManagerMaster extends ToBlockManagerMaster

  case object GetMemoryStatus extends ToBlockManagerMaster

  case object GetStorageStatus extends ToBlockManagerMaster

  case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
    extends ToBlockManagerMaster

  case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true)
    extends ToBlockManagerMaster

  case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

  case object ExpireDeadHosts extends ToBlockManagerMaster

  //更新Block信息
  case class UpdateBlockInfo(
      var blockManagerId: BlockManagerId,
      var blockId: BlockId,
      var storageLevel: StorageLevel,
      var memSize: Long,
      var diskSize: Long,
      var tachyonSize: Long)
    extends ToBlockManagerMaster
    with Externalizable {

    def this() = this(null, null, null, 0, 0, 0)  // For deserialization only

    override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
      blockManagerId.writeExternal(out)
      out.writeUTF(blockId.name)
      storageLevel.writeExternal(out)
      out.writeLong(memSize)
      out.writeLong(diskSize)
      out.writeLong(tachyonSize)
    }

    override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
      blockManagerId = BlockManagerId(in)
      blockId = BlockId(in.readUTF())
      storageLevel = StorageLevel(in)
      memSize = in.readLong()
      diskSize = in.readLong()
      tachyonSize = in.readLong()
    }
  }

 

 

2. BlockManagerMasterActor说明

 

/**
 * BlockManagerMasterActor is an actor on the master node to track statuses of
 * all slaves' block managers.
 */
 包含的数据结构:

 

 

 

  // Mapping from block manager id to the block manager's information.
  ///BlockManagerId与BlockManagerInfo之间的对应,每个Executor对应一个BlockManagerId
  private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]

  // Mapping from executor ID to block manager ID.
  //Executor ID与BlockManagerID之间的对应关系
  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

  // Mapping from block id to the set of block managers that have the block.
  //BlockId与包含这个Block的Location(由BlockManagerId表示)
  private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
 

 

BlockManagerId是一个由host,port和executorId表示的数据结构,从这里也可以看出来BlockManager是Executor范围的数据结构

 

 

3. BlockManagerSlaveActor说明

 

/**
 * An actor to take commands from the master to execute options. For example,
 * this is used to remove blocks from the slave's BlockManager.
 */

BlockManagerSlaveActor包含的数据结构都体现在构造函数中了,如下所示,BlockManagerSlaveActor包含了本Executor对应的BlockManager以及该Executor的MapOutputTracker用于记录Map Shuffle输出

private[storage]
class BlockManagerSlaveActor(
    blockManager: BlockManager,
    mapOutputTracker: MapOutputTracker)
  extends Actor with ActorLogReceive with Logging

 

弄清楚BlockManager的通信机制,发现要比分析BlockManager的读写数据(依赖于DiskStore和MemoryStore实现,而DiskStore又依赖于DiskBlockManager实现)复杂一些,主要是头脑中没有清晰的picuture:关于BlockManager,Driver有什么,Executor上有什么,它们之间如何通信,这个继续分析吧。

 

 

 

 

 

【Spark102】Spark存储模块BlockManager剖析

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
以WordCount为例,最简单情况的Shuffle过程为例,展示Spark Shuffle的读写过程, WordCount代码: p
以WordCount为例,最简单情况的Shuffle过程为例,展示Spark Shuffle的读写过程, WordCount代码: p
以WordCount为例,最简单情况的Shuffle过程为例,展示Spark Shuffle的读写过程, WordCount代码: p
欢迎转载,转载请注明出处,徽沪一郎。 概要 图的并行化处理一直是一个非常热门的话题,这里头的重
上周调研并介绍了一些高级矩阵操作,本周的内容就是来实现其中的LU分解,目前已经几近完成。实现的
6 spark
环境: Hadoop版本:Apache Hadoop2.7.1 Spark版本:Apache Spark1.4.1 核心代码: 测试数据: Java
在前面几篇博客里,介绍了Spark的伪分布式安装,以及使用Spark Shell进行交互式操作,本篇博客主要
在前面几篇博客里,介绍了Spark的伪分布式安装,以及使用Spark Shell进行交互式操作,本篇博客主要
Spark Streaming uses a “micro-batch” architecture, where the streaming computation is treat
http://book.51cto.com/art/201408/448416.htm 一、如何实现多台机器的ssh无密码登录 当我们在配置
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号