当前位置:首页 > 开发 > 开源软件 > 正文

【Spark八十三】BlockManager在Spark中的使用场景

发表于: 2015-03-30   作者:bit1129   来源:转载   浏览:
摘要: 1. Broadcast变量的存储,在HttpBroadcast类中可以知道 2. RDD通过CacheManager存储RDD中的数据,CacheManager也是通过BlockManager进行存储的 3. ShuffleMapTask得到的结果数据,是通过FileShuffleBlockManager进行管理的,而FileShuffleBlockManager最终也是使用BlockMan

1. Broadcast变量的存储,在HttpBroadcast类中可以知道

2. RDD通过CacheManager存储RDD中的数据,CacheManager也是通过BlockManager进行存储的

3. ShuffleMapTask得到的结果数据,是通过FileShuffleBlockManager进行管理的,而FileShuffleBlockManager最终也是使用BlockManager来对数据块进行管理

4. ResultTask的结果如果非常大,那么使用IndirectTaskResult作为返回结果给Driver,Driver通过IndirectTaskResult反查BlockManager获取数据,通过Netty的数据传输

代码在TaskRunner的run方法中。

 

       // directSend = sending directly back to the driver
        val serializedResult = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { //如果结果大于Akka传输的数据量(默认10M)
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes( //设置到BlockManager中,MEMORY_AND_DISK_SER表示
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }

 

 5. Spark Streaming将接收到的数据首先写入到BlockManager中

 

关于StorageLevel的设置:

 

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)

五个参数:

使用磁盘存储

使用内存存储

使用Tachyon存储

deserialzed表示存储的数据是否是原始数据(没有进行序列化)?

副本数

 

  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)

 

 

 

 

【Spark八十三】BlockManager在Spark中的使用场景

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
1 spark
环境: Hadoop版本:Apache Hadoop2.7.1 Spark版本:Apache Spark1.4.1 核心代码: 测试数据: Java
Spark本身用Scala语言编写,运行于Java虚拟机(JVM)。只要在安装了Java 6以上版本的便携式计算机或
国际版的Azure 已经可以正常支持Spark应用,但是当我们在使用中国版的Azure的时候,在Azure中国版官
相对于使用MapReduce或者Spark Application的方式进行数据分析,使用Hive SQL或Spark SQL能为我们省
现有的hadoop生态系统中存在的问题 1)使用mapreduce进行批量离线分析; 2)使用hive进行历史数据的
package org.apache.spark.sql.sources import org.apache.spark.SparkContext import java.sql.{Re
背景 使用spark开发已有几个月。相比于python/hive,scala/spark学习门槛较高。尤其记得刚开时,举
1. Spark中的基本概念 在Spark中,有下面的基本概念。 Application:基于Spark的用户程序,包含了一
1. Spark中的基本概念 在Spark中,有下面的基本概念。 Application:基于Spark的用户程序,包含了一
在前面几篇博客里,介绍了Spark的伪分布式安装,以及使用Spark Shell进行交互式操作,本篇博客主要
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号