任务不是可序列化的:java.io.NotSerializableException仅在类不是对象时调用函数外的函数
在闭包之外调用函数时出现奇怪的行为:
- 当函数在一个对象中一切正在工作
-
当函数在一个类中获得:
Task not serializable: java.io.NotSerializableException: testing
问题是我需要我的代码,而不是一个对象。 任何想法为什么发生这种情况? 是一个Scala对象序列化(默认?)?
这是一个工作代码示例:
object working extends App { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) //calling function outside closure val after = rddList.map(someFunc(_)) def someFunc(a:Int) = a+1 after.collect().map(println(_)) }
这是非工作的例子:
object NOTworking extends App { new testing().doIT } //adding extends Serializable wont help class testing { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) def doIT = { //again calling the fucntion someFunc val after = rddList.map(someFunc(_)) //this will crash (spark lazy) after.collect().map(println(_)) } def someFunc(a:Int) = a+1 }
我不认为其他答案是完全正确的。 RDD确实是可序列化的 ,所以这不是导致你的任务失败的原因。
Spark是一个分布式计算引擎,其主要抽象是一个弹性分布式数据集( RDD ),可以将其视为分布式集合。 基本上,RDD的元素被划分到集群的节点上,但Spark从用户那里抽象出来,让用户与RDD(集合)交互,就好像它是本地的。
不要进入太多的细节,但是当你在一个RDD( map
, flatMap
, filter
和其他)上运行不同的转换时,你的转换代码(closure)是:
- 在驱动程序节点上序列化,
- 运送到集群中的相应节点,
- 反序列化,
- 最后在节点上执行
你当然可以在本地运行(如你的例子),但是所有这些阶段(除了通过networking传送)仍然会发生。 [这可以让你捕捉任何错误,甚至在部署到生产之前]
你的第二种情况会发生什么,就是你正在调用一个方法,在map函数内部的类testing
定义。 Spark看到了这一点,并且由于方法不能被序列化,所以Spark试图序列化整个 testing
类,以便代码在另一个JVM中执行时仍然可以工作。 你有两种可能性:
要么你让类testing可序列化,所以整个类可以由Spark序列化:
import org.apache.spark.{SparkContext,SparkConf} object Spark { val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]")) } object NOTworking extends App { new Test().doIT } class Test extends java.io.Serializable { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } def someFunc(a: Int) = a + 1 }
或者使用someFunc
函数而不是方法(函数是Scala中的对象),以便Spark能够序列化它:
import org.apache.spark.{SparkContext,SparkConf} object Spark { val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]")) } object NOTworking extends App { new Test().doIT } class Test { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } val someFunc = (a: Int) => a + 1 }
与类序列化相似但并非相同的问题可能会引起您的兴趣,您可以在此Spark Summit 2013演示文稿中阅读它。
作为一个方面说明,你可以重写rddList.map(someFunc(_))
到rddList.map(someFunc)
,它们是完全一样的。 通常情况下,第二个是首选的,因为它不那么冗长和清晰。
编辑(2015-03-15): SPARK-5307介绍SerializationDebugger和Spark 1.3.0是第一个使用它的版本。 它将序列化path添加到NotSerializableException 。 当遇到一个NotSerializableExceptionexception时,debugging器访问对象图以find不能被序列化的对象的path,并构造信息以帮助用户find对象。
在OP的情况下,这是打印到标准输出:
Serialization stack: - object not serializable (class: testing, value: testing@2dfe2f00) - field (class: testing$$anonfun$1, name: $outer, type: class testing) - object (class testing$$anonfun$1, <function1>)
格雷格的答案很好的解释了为什么原来的代码不起作用,有两种方法来解决这个问题。 但是,这个解决scheme不是很灵活, 考虑一下你的闭包包含一个你无法控制的不可Serializable
类的方法调用的情况。 你既不能把Serializable
标签添加到这个类中,也不能改变底层的实现来把这个方法改变成一个函数。
Nilesh为此提出了一个很好的解决方法,但是解决scheme可以更简洁一般:
def genMapper[A, B](f: A => B): A => B = { val locker = com.twitter.chill.MeatLocker(f) x => locker.get.apply(x) }
这个函数序列化程序然后可以用来自动换行闭包和方法调用:
rdd map genMapper(someFunc)
这个技术也有不需要额外的Shark依赖来访问KryoSerializationWrapper
,因为Twitter的Chill已经被内核Spark
完整的谈话完全解释了这个问题,它提出了一个很好的范式转换方法来避免这些序列化问题: https : //github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md
顶尖的投票答案基本上是build议扔掉一个完整的语言function – 这是不再使用的方法,只使用function。 事实上,在类中的函数式编程方法应该避免,但把它们转换成函数并不能解决这里的devise问题(参见上面的链接)。
作为在这种特殊情况下的一个快速修复,你可以使用@transient
注释来告诉它不要尝试序列化有问题的值(在这里, Spark.ctx
是一个自定义的类,而不是按照OP的命名)。
@transient val rddList = Spark.ctx.parallelize(list)
你也可以重组代码,使rddList住在其他地方,但这也是讨厌的。
未来可能是孢子
未来斯卡拉将包括这些所谓的“孢子”,应该让我们罚款粮食控制什么,并没有真正被closures拉入。 此外,这应该将所有意外地将不可序列化的types(或任何不需要的值)拉到编译错误中的错误,而不是现在这是可怕的运行时exception/内存泄漏。
http://docs.scala-lang.org/sips/pending/spores.html
Kryo序列化提示
当使用kroro时,注册是必要的,这意味着你得到错误而不是内存泄漏:
“最后,我知道kryo有kryo.setRegistrationOptional(true),但是我正在试图弄清楚如何使用它。当这个选项被打开的时候,如果我还没有注册,kryo似乎仍然会抛出exception类“。
使用kryo注册课程的策略
当然,这只能给你提供types级别的控制,而不是值级别的控制。
…更多的想法来。
我用不同的方法解决了这个问题。 你只需要在通过闭包之前序列化对象,然后反序列化。 即使你的类不是可序列化的,这种方法也是可行的,因为它在幕后使用了Kryo。 所有你需要的是一些咖喱。 ;)
以下是我如何做的一个例子:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) (foo: Foo) : Bar = { kryoWrapper.value.apply(foo) } val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _ rdd.flatMap(mapper).collectAsMap() object Blah(abc: ABC) extends (Foo => Bar) { def apply(foo: Foo) : Bar = { //This is the real function } }
随意让Blah像你想要的那样复杂,类,伴侣对象,嵌套类,对多个第三方库的引用。
KryoSerializationWrapper指: https : //github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
我不完全确定这适用于Scala,但在Java中,我通过重构我的代码来解决NotSerializableException
,以便闭包不访问不可序列化的final
字段。
我面临类似的问题,我从Grega的回答中得到的是
object NOTworking extends App { new testing().doIT } //adding extends Serializable wont help class testing { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) def doIT = { //again calling the fucntion someFunc val after = rddList.map(someFunc(_)) //this will crash (spark lazy) after.collect().map(println(_)) } def someFunc(a:Int) = a+1 }
你的doIT方法试图序列化someFunc(_)方法,但是由于方法不是可序列化的,它会尝试序列化不能再序列化的类testing 。
所以让你的代码工作,你应该在doIT方法中定义someFunc 。 例如:
def doIT = { def someFunc(a:Int) = a+1 //function definition } val after = rddList.map(someFunc(_)) after.collect().map(println(_)) }
如果有多个函数进入图片,那么所有这些函数应该可用于父上下文。