Spark java.lang.OutOfMemoryError:Java堆空间
我的集群:1个主站,11个从站,每个节点有6GB的内存。
我的设置:
spark.executor.memory=4g, Dspark.akka.frameSize=512
这是问题:
首先 ,我从HDFS读取一些数据(2.19 GB)到RDD:
val imageBundleRDD = sc.newAPIHadoopFile(...)
其次 ,在RDD上做一些事情:
val res = imageBundleRDD.map(data => { val desPoints = threeDReconstruction(data._2, bg) (data._1, desPoints) })
最后输出到HDFS:
res.saveAsNewAPIHadoopFile(...)
当我运行我的程序时,显示:
..... 14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL) 14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms 14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL) 14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms 14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL) Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark] java.lang.OutOfMemoryError: Java heap space
有太多的任务?
PS :当input数据大约为225 MB时,每件事情都可以。
我该如何解决这个问题?
我有几点build议:
- 如果你的节点被configuration为Spark的最大值为6g(并且对于其他进程稍微留下一点点),那么使用6g而不是4g,
spark.executor.memory=6g
。 通过检查UI 来确保你使用了尽可能多的内存 (它会说你使用了多less内存) - 尝试使用更多的分区,你应该有每个CPU 2 – 4。 增加分区数量的IME通常是使程序更稳定(通常更快)的最简单方法。 对于大量的数据,你可能需要每个CPU超过4个,在某些情况下,我不得不使用8000个分区!
- 使用
spark.storage.memoryFraction
减less为高速caching保留的内存部分 。 如果你不使用cache()
或者在你的代码中persist
,这可能是0.它的默认值是0.6,这意味着你只能得到0.4 * 4g的内存。 IME减less记忆经常使得OOM消失。 更新:从火花1.6显然我们将不再需要玩这些值,火花会自动确定它们。 - 与上述类似,但洗牌记忆分数 。 如果你的工作不需要太多的随机存储器,那么把它设置为一个较低的值(这可能会导致你的洗牌溢出到磁盘上,这会对速度造成灾难性的影响)。 有时,当它是一个洗牌操作时,你需要做相反的事情,比如将其设置为0.8,或者确保允许你的洗牌溢出到磁盘(这是1.0.0之后的默认设置)。
- 注意内存泄漏 ,这些通常是由于意外closures您在lambdaexpression式中不需要的对象造成的。 诊断的方法是在日志中查找“任务序列化为XXX字节”,如果XXX大于几k或大于MB,则可能有内存泄漏。 请参阅https://stackoverflow.com/a/25270600/1586965
- 涉及以上; 使用广播variables,如果你真的需要大的对象。
- 如果要caching大型RDD,并且可能会牺牲一些访问时间,请考虑串行化RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage 。 或者甚至将它们caching到磁盘上(如果使用SSD,有时不会那么糟糕)。
- ( 高级 )与上面相关,避免
String
和重度嵌套的结构(如Map
和嵌套案例类)。 如果可能的话,尽量只使用原始types,并索引所有非原语,特别是如果你期望有很多重复的话。WrappedArray
在嵌套结构上selectWrappedArray
。 甚至可以推出自己的序列化 – 你将获得关于如何有效地将数据备份到字节的最多信息, 使用它 ! - ( bit hacky )再次caching时,考虑使用
Dataset
来caching你的结构,因为它将使用更高效的序列化。 与前面的要点相比,这应该被视为黑客攻击。 将您的领域知识构build到algorithm/序列化中,可以将内存/caching空间最小化为100倍或1000倍,而所有Dataset
可能会给内存上的2倍-5倍和磁盘上的10倍压缩(拼块)。
http://spark.apache.org/docs/1.2.1/configuration.html
编辑:(所以我可以谷歌自己更容易)以下也是这个问题的指示:
java.lang.OutOfMemoryError : GC overhead limit exceeded
为了添加一个经常不被讨论的用例,我将通过在本地模式下通过spark-submit
提交一个Spark
应用程序来提供一个解决scheme。
根据Jacek Laskowski的gitbook 掌握Apache Spark :
您可以在本地模式下运行Spark。 在这种非分布式单JVM部署模式下,Spark在同一个JVM中产生所有的执行组件:driver,executor,backend和master。 这是使用驱动程序执行的唯一模式。
因此,如果您在heap
遇到OOM
错误,则只需调整driver-memory
而不是executor-memory
driver-memory
就足够了。
这里是一个例子:
spark-1.6.1/bin/spark-submit --class "MyClass" --driver-memory 12g --master local[*] target/scala-2.10/simple-project_2.10-1.0.jar
看一下启动脚本,在那里设置一个Java堆大小,看起来你在运行Spark worker之前没有设置它。
# Set SPARK_MEM if it isn't already set since we also use it for this process SPARK_MEM=${SPARK_MEM:-512m} export SPARK_MEM # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="$OUR_JAVA_OPTS" JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
您可以在这里find文档来部署脚本。
你应该增加驱动程序的内存。 在你的$ SPARK_HOME / conf文件夹中,你应该find文件spark-defaults.conf
,编辑并设置spark.driver.memory 4000m
这取决于你的主内存,我想。 这是为我解决问题,一切顺利
设置内存堆大小的位置(至less在spark-1.0.0中)是在conf / spark-env中。 相关的variables是SPARK_EXECUTOR_MEMORY
& SPARK_DRIVER_MEMORY
。 部署指南中有更多文档
另外,不要忘记将configuration文件复制到所有从节点。