当前位置:首页 > 开发 > 开源软件 > 正文

【Spark七十八】Spark Kyro序列化

发表于: 2015-02-22   作者:bit1129   来源:转载   浏览:
摘要: 当使用SparkContext的saveAsObjectFile方法将对象序列化到文件,以及通过objectFile方法将对象从文件反序列出来的时候,Spark默认使用Java的序列化以及反序列化机制,通常情况下,这种序列化机制是很低效的,Spark支持使用Kyro作为对象的序列化和反序列化机制,序列化的速度比java更快,但是使用Kyro时要注意,Kyro目前还是有些bug。 Spark

当使用SparkContext的saveAsObjectFile方法将对象序列化到文件,以及通过objectFile方法将对象从文件反序列出来的时候,Spark默认使用Java的序列化以及反序列化机制,通常情况下,这种序列化机制是很低效的,Spark支持使用Kyro作为对象的序列化和反序列化机制,序列化的速度比java更快,但是使用Kyro时要注意,Kyro目前还是有些bug。

  • Spark默认是使用Java的ObjectOutputStream框架,它支持所有的继承于java.io.Serializable序列化,如果想要进行调优的话,可以通过继承java.io.Externalizable。这种格式比较大,而且速度慢。
  • Spark还支持这种方式Kryo serialization,它的速度快,而且压缩比高于Java的序列化,但是它不支持所有的Serializable格式,并且需要在程序里面注册。它需要在实例化SparkContext之前进行注册
When Spark is transferring data over the network or spilling data to disk, it needs to
serialize objects into a binary format. This comes into play during shuffle operations,
where potentially large amounts of data are transferred. By default Spark will use
Java’s built-in serializer. Spark also supports the use of Kryo, a third-party serialization
library that improves on Java’s serialization by offering both faster serialization
times and a more compact binary representation, but cannot serialize all types of
objects “out of the box.” Almost all applications will benefit from shifting to Kryo for
serialization.

 

 

代码示例:

package spark.examples.kryo

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.serializer.KryoRegistrator

//两个成员变量name和age,同时必须实现java.io.Serializable接口
class MyClass1(val name: String, val age: Int) extends java.io.Serializable {
}

//两个成员变量name和age,同时必须实现java.io.Serializable接口
class MyClass2(val name: String, val age: Int) extends java.io.Serializable {

}

//注册使用Kryo序列化的类,要求MyClass1和MyClass2必须实现java.io.Serializable
class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[MyClass1]);
    kryo.register(classOf[MyClass2]);
  }
}

object SparkKryo {
  def main(args: Array[String]) {
    //设置序列化器为KryoSerializer,也可以在配置文件中进行配置
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "spark.examples.kryo.MyKryoRegistrator")
    val conf = new SparkConf()
    conf.setAppName("SparkKryo")
    conf.setMaster("local[3]")

    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List(new MyClass1("Tom", 31), new MyClass1("Jack", 23), new MyClass1("Mary", 19)))
    val fileDir = "file:///d:/wordcount" + System.currentTimeMillis()
    //将rdd中的对象通过kyro进行序列化,保存到fileDir目录中
    rdd.saveAsObjectFile(fileDir)
    //读取part-00000文件中的数据,对它进行反序列化,,得到对象集合rdd1
    val rdd1 = sc.objectFile[MyClass1](fileDir + "/" + "part-00000")


    rdd1.foreachPartition(iter => {
      while (iter.hasNext) {
        val objOfMyClass1 = iter.next();
        println(objOfMyClass1.name)
      }
    })

    sc.stop
  }
}

 

查看保存到文件中的内容,是个二进制数据:

SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable      蓑_xi??蛔?z汲   i       e ur [Lspark.examples.kryo.MyClass1;?
独#v?  xp   sr spark.examples.kryo.MyClass1z 峌#   xp

 

 

问题:

对于普通字符,数字,字符串写入到object文件,是否也是序列化的过程?明确指定使用kvro序列化Int之后,保存的文件确实是二进制的。去掉对Int的注册之后,结果还是一样,序列化的结果完全一样,结果都是:

SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable      F脗?庻籡陭姯&?  '       # ur [IM篳&v瓴?  xp         

 

 

 

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.serializer.KryoRegistrator

//注册使用Kryo序列化的类,对Int进行序列化
class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[Int]);
  }
}

object SparkKryoPrimitiveType {
  def main(args: Array[String]) {
    //设置序列化器为KryoSerializer,也可以在配置文件中进行配置
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "spark.examples.kryo.MyKryoRegistrator")
    val conf = new SparkConf()
    conf.setAppName("SparkKryoPrimitiveType")
    conf.setMaster("local[3]")

    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List(1, 3, 7, 9, 11, 22))
    val fileDir = "file:///d:/wordcount" + System.currentTimeMillis()
    //将rdd中的对象通过kyro进行序列化,保存到fileDir目录中
    rdd.saveAsObjectFile(fileDir)
    //读取part-00000文件中的数据,对它进行反序列化,,得到对象集合rdd1
    val rdd1 = sc.objectFile[Int](fileDir + "/" + "part-00000")

    rdd1.foreachPartition(iter => {
      while (iter.hasNext) {
        println(iter.next())
      }
    })

    sc.stop
  }
}

 

 其它:

指定使用Kyro序列化,以及注册Kyro序列化类,可可以使用如下方式

val conf = new SparkConf()
//这句是多于的,调用conf的registerKryoClasses时,已经设置了序列化方法
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Be strict about class registration
///如果一个要序列化的类没有进行Kryo注册,则强制Spark报错
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))

 

  /**
   * Use Kryo serialization and register the given set of classes with Kryo.
   * If called multiple times, this will append the classes from all calls together.
   */
  def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
    val allClassNames = new LinkedHashSet[String]()
    allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
    allClassNames ++= classes.map(_.getName)

    set("spark.kryo.classesToRegister", allClassNames.mkString(","))
    set("spark.serializer", classOf[KryoSerializer].getName)
    this
  }

 

 

【Spark七十八】Spark Kyro序列化

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
1 spark
环境: Hadoop版本:Apache Hadoop2.7.1 Spark版本:Apache Spark1.4.1 核心代码: 测试数据: Java
在前面几篇博客里,介绍了Spark的伪分布式安装,以及使用Spark Shell进行交互式操作,本篇博客主要
在前面几篇博客里,介绍了Spark的伪分布式安装,以及使用Spark Shell进行交互式操作,本篇博客主要
Spark Streaming uses a “micro-batch” architecture, where the streaming computation is treat
http://book.51cto.com/art/201408/448416.htm 一、如何实现多台机器的ssh无密码登录 当我们在配置
引入 一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。 面向大规模数据分
MapReduce中的Shuffle 在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Re
Spark专用名词 RDD —- resillient distributed dataset 弹性分布式数据集 Operation —- 作用于RDD
开发Spark WordCount的步骤 下载并配置Scala2.11.4 下载Scala版本的Eclipse,简称Scala IDE 下载Spa
什么是RDD Spark是围绕着RDD(Resilient Distributed Dataset,弹性分布式数据集)建立起来的,也就是
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号