Spark java.lang.OutOfMemoryError:Java堆空间

我的集群:1个主站,11个从站,每个节点有6GB的内存。

我的设置:

spark.executor.memory=4g, Dspark.akka.frameSize=512 

这是问题:

首先 ,我从HDFS读取一些数据(2.19 GB)到RDD:

 val imageBundleRDD = sc.newAPIHadoopFile(...) 

其次 ,在RDD上做一些事情:

 val res = imageBundleRDD.map(data => { val desPoints = threeDReconstruction(data._2, bg) (data._1, desPoints) }) 

最后输出到HDFS:

 res.saveAsNewAPIHadoopFile(...) 

当我运行我的程序时,显示:

 ..... 14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL) 14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms 14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL) 14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms 14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL) Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark] java.lang.OutOfMemoryError: Java heap space 

有太多的任务?

PS :当input数据大约为225 MB时,每件事情都可以。

我该如何解决这个问题?

我有几点build议:

  • 如果你的节点被configuration为Spark的最大值为6g(并且对于其他进程稍微留下一点点),那么使用6g而不是4g, spark.executor.memory=6g 。 通过检查UI 确保你使用了尽可能多的内存 (它会说你使用了多less内存)
  • 尝试使用更多的分区,你应该有每个CPU 2 – 4。 增加分区数量的IME通常是使程序更稳定(通常更快)的最简单方法。 对于大量的数据,你可能需要每个CPU超过4个,在某些情况下,我不得不使用8000个分区!
  • 使用spark.storage.memoryFraction减less为高速caching保留的内存部分 。 如果你不使用cache()或者在你的代码中persist ,这可能是0.它的默认值是0.6,这意味着你只能得到0.4 * 4g的内存。 IME减less记忆经常使得OOM消失。 更新:从火花1.6显然我们将不再需要玩这些值,火花会自动确定它们。
  • 与上述类似,但洗牌记忆分数 。 如果你的工作不需要太多的随机存储器,那么把它设置为一个较低的值(这可能会导致你的洗牌溢出到磁盘上,这会对速度造成灾难性的影响)。 有时,当它是一个洗牌操作时,你需要做相反的事情,比如将其设置为0.8,或者确保允许你的洗牌溢出到磁盘(这是1.0.0之后的默认设置)。
  • 注意内存泄漏 ,这些通常是由于意外closures您在lambdaexpression式中不需要的对象造成的。 诊断的方法是在日志中查找“任务序列化为XXX字节”,如果XXX大于几k或大于MB,则可能有内存泄漏。 请参阅https://stackoverflow.com/a/25270600/1586965
  • 涉及以上; 使用广播variables,如果你真的需要大的对象。
  • 如果要caching大型RDD,并且可能会牺牲一些访问时间,请考虑串行化RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage 。 或者甚至将它们caching到磁盘上(如果使用SSD,有时不会那么糟糕)。
  • 高级 )与上面相关,避免String和重度嵌套的结构(如Map和嵌套案例类)。 如果可能的话,尽量只使用原始types,并索引所有非原语,特别是如果你期望有很多重复的话。 WrappedArray在嵌套结构上selectWrappedArray 。 甚至可以推出自己的序列化 – 你将获得关于如何有效地将数据备份到字节的最多信息, 使用它
  • bit hacky )再次caching时,考虑使用Dataset来caching你的结构,因为它将使用更高效的序列化。 与前面的要点相比,这应该被视为黑客攻击。 将您的领域知识构build到algorithm/序列化中,可以将内存/caching空间最小化为100倍或1000倍,而所有Dataset可能会给内存上的2倍-5倍和磁盘上的10倍压缩(拼块)。

http://spark.apache.org/docs/1.2.1/configuration.html

编辑:(所以我可以谷歌自己更容易)以下也是这个问题的指示:

 java.lang.OutOfMemoryError : GC overhead limit exceeded 

为了添加一个经常不被讨论的用例,我将通过在本地模式下通过spark-submit提交一个Spark应用程序来提供一个解决scheme。

根据Jacek Laskowski的gitbook 掌握Apache Spark :

您可以在本地模式下运行Spark。 在这种非分布式单JVM部署模式下,Spark在同一个JVM中产生所有的执行组件:driver,executor,backend和master。 这是使用驱动程序执行的唯一模式。

因此,如果您在heap遇到OOM错误,则只需调整driver-memory而不是executor-memory driver-memory就足够了。

这里是一个例子:

 spark-1.6.1/bin/spark-submit --class "MyClass" --driver-memory 12g --master local[*] target/scala-2.10/simple-project_2.10-1.0.jar 

看一下启动脚本,在那里设置一个Java堆大小,看起来你在运行Spark worker之前没有设置它。

 # Set SPARK_MEM if it isn't already set since we also use it for this process SPARK_MEM=${SPARK_MEM:-512m} export SPARK_MEM # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="$OUR_JAVA_OPTS" JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM" 

您可以在这里find文档来部署脚本。

你应该增加驱动程序的内存。 在你的$ SPARK_HOME / conf文件夹中,你应该find文件spark-defaults.conf ,编辑并设置spark.driver.memory 4000m这取决于你的主内存,我想。 这是为我解决问题,一切顺利

设置内存堆大小的位置(至less在spark-1.0.0中)是在conf / spark-env中。 相关的variables是SPARK_EXECUTOR_MEMORYSPARK_DRIVER_MEMORY 。 部署指南中有更多文档

另外,不要忘记将configuration文件复制到所有从节点。