spark 序列化
在Spark中,主要有三个涉及到序列化的情况:
- 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输 — 也就是初始化工作是在Driver端进行的,而程序实际运行是在Executor端进行的; 涉及跨进程通信了,所以要进行序列化
- 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
- 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
Kryo序列化框架
Kryo 是一个快速高效的Java对象图形序列化框架,主要特点是性能、高效和易用。该项目用来序列化对象到文件、数据库或者网 络。
但是,它也有一个致命的弱点:生成的byte数据中部包含field数据,对类升级的兼容性很差!所以,若用kryo序列化对象用于C/S架构的话,两边的Class结构要保持一致。
Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。
- Kryo速度是Serializable的10倍。
- 当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用kryo来序列化
- 即使使用kryo序列化,也要继承Serializable接口
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/*
* Kryo 序列化 效率是Java序列化的10倍,但不支持全部类型
* */
case class Dog(name:String)
case class Cat(age:Int)
case class Animal(dog: Dog,cat: Cat)
object TestKyroDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("TestKryo")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Animal], classOf[Cat], classOf[Dog]))
//当序列化的类中包含其他引用类型,最好把其他类型也加入进来,没加入进来的类会以全类名方式进行存储
val spark = SparkSession.builder().config(conf).getOrCreate()
val fr = spark.createDataFrame(Seq(new Animal(new Dog("he"),new Cat(22))))
fr.cache().collect()
}
}
参考链接:
https://blog.csdn.net/qq_43288259/article/details/116749301
https://blog.csdn.net/qq_43192537/article/details/110389236