Apache Spark:核心数量与执行者数量
我试图了解在YARN上运行Spark作业时核心数量和执行者数量之间的关系。
testing环境如下:
- 数据节点数量:3
- 数据节点机器规格:
- CPU:Core i7-4790(内核数量:4,线程数量:8)
- 内存:32GB(8GB x 4)
- HDD:8TB(2TB x 4)
-
networking:1Gb
-
Spark版本:1.0.0
-
Hadoop版本:2.4.0(Hortonworks HDP 2.1)
-
Spark工作stream程:sc.textFile – > filter – > map – > filter – > mapToPair – > reduceByKey – > map – > saveAsTextFile
-
input数据
- types:单个文本文件
- 大小:165GB
- 行数:454,568,833
-
产量
- 第二次过滤后的行数:310,640,717
- 结果文件的行数:99848268
- 结果文件的大小:41GB
该作业运行以下configuration:
-
--master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3
每个数据节点的执行程序,尽可能多地使用内核) -
--master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3
(减less了内核数量) -
--master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12
(less核心,多执行程序)
已用时间:
-
50分15秒
-
55分48秒
-
31分23秒
令我惊讶的是,(3)要快得多。
我认为(1)会更快,因为洗牌时执行者间的沟通会less一些。
尽pipe(1)的核心数量less于(3),但核心数量并不是关键因素,因为2)performance良好。
(在pwilmot的回答后添加了下面的内容。)
有关信息,性能监视器屏幕截图如下所示:
- (1)的Ganglia数据节点摘要 – 作业于04:37开始。
- (3)的Ganglia数据节点摘要 – 工作在19:47开始。 在此之前请忽略图表。
该图大致分为2个部分:
- 第一:从开始reduceByKey:CPU密集型,没有networking活动
- 第二:reduceByKey:CPU降低后,networkingI / O完成。
如图所示,(1)可以使用尽可能多的CPU功率。 所以,这可能不是线程数量的问题。
如何解释这个结果?
为了让所有这些更具体一些,下面是configurationSpark应用程序以尽可能多地使用集群的一个工作示例:想象一下,具有六个节点的集群运行NodeManagers,每个节点配备16个内核和64GB内存 。 NodeManager容量yarn.nodemanager.resource.memory-mb和yarn.nodemanager.resource.cpu-vcores应该分别设置为63 * 1024 = 64512(兆字节)和15。 我们避免将100%的资源分配给YARN容器,因为节点需要一些资源来运行OS和Hadoop守护进程。 在这种情况下,我们为这些系统进程留下了一个千兆字节和一个核心。 Cloudera Manager通过对这些内容进行计算并自动configuration这些YARN属性来提供帮助。
可能的第一个冲动是使用–num-executors 6 –executor-cores 15 –executor-memory 63G 。 但是,这是错误的方法,因为:
63GB +的执行程序内存开销将不符合NodeManager的63GB容量。 应用程序主节点将在其中一个节点上占用一个核心,这意味着该节点上的15核执行程序不会有空间。 每个执行程序15个内核可能导致HDFS I / O吞吐量不佳。
更好的select是使用–num-executors 17 –executor-cores 5 –executor-memory 19G 。 为什么?
这个configuration会导致除了AM之外的所有节点上的三个执行者,这两个执行者将有两个执行者。 – 执行程序的内存被派生为(每个节点63/3个执行程序)= 21.21 * 0.07 = 1.47。 21 – 1.47〜19。
这个解释是在cloudera的博客http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
我自己并没有使用这些设置,所以这只是猜测,但是如果我们将这个问题看作是分布式系统中的普通内核和线程,那么在您的集群中,最多可以使用12个内核(4 * 3个机器)和24个线程(8 * 3机器)。 在前两个示例中,您将为您的作业提供相当数量的内核(可能的计算空间),但在这些内核上运行的线程(作业)数量非常有限,以至于无法使用大部分分配的处理能力即使分配了更多的计算资源,工作也会变慢。
你提到你关心的是洗牌步骤 – 虽然在洗牌步骤中限制开销是很好的,但是利用集群的并行化通常要重要得多。 想想极端的情况 – 一个单线程的零洗牌程序。
据Sandy Ryza介绍 ,当你在HDFS上运行你的Spark应用程序时,
我注意到HDFS客户端遇到大量的并发线程。 一个粗略的猜测是, 每个执行者最多有5个任务可以达到完全的写入吞吐量,所以最好将每个执行者的核心数量保持在这个数量以下。
所以我相信你的第一个configuration比第三个configuration慢是因为HDFS I / O吞吐量不好
我认为其中一个主要原因是地方性的。 你的input文件大小为165G,这个文件的相关块肯定分布在多个DataNode上,更多的执行者可以避免networking拷贝。
尝试设置执行者数量相等块数,我认为可以更快。
Sparkdynamic分配提供了灵活性并dynamic分配资源。 在这个数量的最小和最大的执行者可以给。 另外还可以给出应用程序启动时需要启动的执行程序的数量。
阅读下面的相同:
http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
我认为前两种configuration有一个小问题。 线程和核心的概念如下。 线程的概念是如果内核是理想的,那么使用该内核来处理数据。 所以在前两种情况下内存没有得到充分利用。 如果你想在这个例子中select在每台机器上有10个以上内核的机器。 然后做基准标记。
但不要给每个执行器5个以上的内核,I / O性能将会是瓶颈。
所以做这个基准标记的最好的机器可能是有10个内核的数据节点。
数据节点机器规格:CPU:Core i7-4790(核心数量:10,线程数量:20)RAM:32GB(8GB x 4)HDD:8TB(2TB x 4)