任务不是可序列化的:java.io.NotSerializableException仅在类不是对象时调用函数外的函数

在闭包之外调用函数时出现奇怪的行为:

  • 当函数在一个对象中一切正在工作
  • 当函数在一个类中获得:

    Task not serializable: java.io.NotSerializableException: testing 

问题是我需要我的代码,而不是一个对象。 任何想法为什么发生这种情况? 是一个Scala对象序列化(默认?)?

这是一个工作代码示例:

 object working extends App { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) //calling function outside closure val after = rddList.map(someFunc(_)) def someFunc(a:Int) = a+1 after.collect().map(println(_)) } 

这是非工作的例子:

 object NOTworking extends App { new testing().doIT } //adding extends Serializable wont help class testing { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) def doIT = { //again calling the fucntion someFunc val after = rddList.map(someFunc(_)) //this will crash (spark lazy) after.collect().map(println(_)) } def someFunc(a:Int) = a+1 } 

我不认为其他答案是完全正确的。 RDD确实是可序列化的 ,所以这不是导致你的任务失败的原因。

Spark是一个分布式计算引擎,其主要抽象是一个弹性分布式数据集( RDD ),可以将其视为分布式集合。 基本上,RDD的元素被划分到集群的节点上,但Spark从用户那里抽象出来,让用户与RDD(集合)交互,就好像它是本地的。

不要进入太多的细节,但是当你在一个RDD( mapflatMapfilter和其他)上运行不同的转换时,你的转换代码(closure)是:

  1. 在驱动程序节点上序列化,
  2. 运送到集群中的相应节点,
  3. 反序列化,
  4. 最后在节点上执行

你当然可以在本地运行(如你的例子),但是所有这些阶段(除了通过networking传送)仍然会发生。 [这可以让你捕捉任何错误,甚至在部署到生产之前]

你的第二种情况会发生什么,就是你正在调用一个方法,在map函数内部的类testing定义。 Spark看到了这一点,并且由于方法不能被序列化,所以Spark试图序列化整个 testing类,以便代码在另一个JVM中执行时仍然可以工作。 你有两种可能性:

要么你让类testing可序列化,所以整个类可以由Spark序列化:

 import org.apache.spark.{SparkContext,SparkConf} object Spark { val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]")) } object NOTworking extends App { new Test().doIT } class Test extends java.io.Serializable { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } def someFunc(a: Int) = a + 1 } 

或者使用someFunc函数而不是方法(函数是Scala中的对象),以便Spark能够序列化它:

 import org.apache.spark.{SparkContext,SparkConf} object Spark { val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]")) } object NOTworking extends App { new Test().doIT } class Test { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } val someFunc = (a: Int) => a + 1 } 

与类序列化相似但并非相同的问题可能会引起您的兴趣,您可以在此Spark Summit 2013演示文稿中阅读它。

作为一个方面说明,你可以重写rddList.map(someFunc(_))rddList.map(someFunc) ,它们是完全一样的。 通常情况下,第二个是首选的,因为它不那么冗长和清晰。

编辑(2015-03-15): SPARK-5307介绍SerializationDebugger和Spark 1.3.0是第一个使用它的版本。 它将序列化path添加到NotSerializableException 。 当遇到一个NotSerializableExceptionexception时,debugging器访问对象图以find不能被序列化的对象的path,并构造信息以帮助用户find对象。

在OP的情况下,这是打印到标准输出:

 Serialization stack: - object not serializable (class: testing, value: testing@2dfe2f00) - field (class: testing$$anonfun$1, name: $outer, type: class testing) - object (class testing$$anonfun$1, <function1>) 

格雷格的答案很好的解释了为什么原来的代码不起作用,有两种方法来解决这个问题。 但是,这个解决scheme不是很灵活, 考虑一下你的闭包包含一个你无法控制的不可Serializable类的方法调用的情况。 你既不能把Serializable标签添加到这个类中,也不能改变底层的实现来把这个方法改变成一个函数。

Nilesh为此提出了一个很好的解决方法,但是解决scheme可以更简洁一般:

 def genMapper[A, B](f: A => B): A => B = { val locker = com.twitter.chill.MeatLocker(f) x => locker.get.apply(x) } 

这个函数序列化程序然后可以用来自动换行闭包和方法调用:

 rdd map genMapper(someFunc) 

这个技术也有不需要额外的Shark依赖来访问KryoSerializationWrapper ,因为Twitter的Chill已经被内核Spark

完整的谈话完全解释了这个问题,它提出了一个很好的范式转换方法来避免这些序列化问题: https : //github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md

顶尖的投票答案基本上是build议扔掉一个完整的语言function – 这是不再使用的方法,只使用function。 事实上,在类中的函数式编程方法应该避免,但把它们转换成函数并不能解决这里的devise问题(参见上面的链接)。

作为在这种特殊情况下的一个快速修复,你可以使用@transient注释来告诉它不要尝试序列化有问题的值(在这里, Spark.ctx是一个自定义的类,而不是按照OP的命名)。

 @transient val rddList = Spark.ctx.parallelize(list) 

你也可以重组代码,使rddList住在其他地方,但这也是讨厌的。

未来可能是孢子

未来斯卡拉将包括这些所谓的“孢子”,应该让我们罚款粮食控制什么,并没有真正被closures拉入。 此外,这应该将所有意外地将不可序列化的types(或任何不需要的值)拉到编译错误中的错误,而不是现在这是可怕的运行时exception/内存泄漏。

http://docs.scala-lang.org/sips/pending/spores.html

Kryo序列化提示

当使用kroro时,注册是必要的,这意味着你得到错误而不是内存泄漏:

“最后,我知道kryo有kryo.setRegistrationOptional(true),但是我正在试图弄清楚如何使用它。当这个选项被打开的时候,如果我还没有注册,kryo似乎仍然会抛出exception类“。

使用kryo注册课程的策略

当然,这只能给你提供types级别的控制,而不是值级别的控制。

…更多的想法来。

我用不同的方法解决了这个问题。 你只需要在通过闭包之前序列化对象,然后反序列化。 即使你的类不是可序列化的,这种方法也是可行的,因为它在幕后使用了Kryo。 所有你需要的是一些咖喱。 ;)

以下是我如何做的一个例子:

 def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) (foo: Foo) : Bar = { kryoWrapper.value.apply(foo) } val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _ rdd.flatMap(mapper).collectAsMap() object Blah(abc: ABC) extends (Foo => Bar) { def apply(foo: Foo) : Bar = { //This is the real function } } 

随意让Blah像你想要的那样复杂,类,伴侣对象,嵌套类,对多个第三方库的引用。

KryoSerializationWrapper指: https : //github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

我不完全确定这适用于Scala,但在Java中,我通过重构我的代码来解决NotSerializableException ,以便闭包不访问不可序列化的final字段。

我面临类似的问题,我从Grega的回答中得到的是

 object NOTworking extends App { new testing().doIT } //adding extends Serializable wont help class testing { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) def doIT = { //again calling the fucntion someFunc val after = rddList.map(someFunc(_)) //this will crash (spark lazy) after.collect().map(println(_)) } def someFunc(a:Int) = a+1 } 

你的doIT方法试图序列化someFunc(_)方法,但是由于方法不是可序列化的,它会尝试序列化不能再序列化的类testing

所以让你的代码工作,你应该在doIT方法中定义someFunc 。 例如:

 def doIT = { def someFunc(a:Int) = a+1 //function definition } val after = rddList.map(someFunc(_)) after.collect().map(println(_)) } 

如果有多个函数进入图片,那么所有这些函数应该可用于父上下文。