(为什么)我们需要调用caching还是坚持RDD
当从文本文件或集合(或从另一个RDD)创build弹性分布式数据集(RDD)时,是否需要显式调用“cache”或“persist”以将RDD数据存储到内存中? 或者默认情况下RDD数据是以分布的方式存储在内存中的?
val textFile = sc.textFile("/user/emp.txt")
根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在所有/某些节点的内存中使用。
如果是这样,为什么我们需要在textFile RDD上调用“cache”或“persist”呢?
大多数RDD操作是懒惰的。 把RDD看作是对一系列操作的描述。 RDD不是数据。 所以这一行:
val textFile = sc.textFile("/user/emp.txt")
它什么都不做。 它创build一个RDD,表示“我们将需要加载这个文件”。 这个文件没有加载。
需要观察数据内容的RDD操作不能懒惰。 (这些被称为动作 。)一个例子是RDD.count
– 告诉你文件中的行数,这个文件需要被读取。 所以,如果你写了textFile.count
,在这一点上文件将被读取,行将被计数,并计数将被返回。
如果你再次调用textFile.count
呢? 同样的事情:文件将被读取并重新计数。 没有储存。 RDD不是数据。
那么RDD.cache
做什么? 如果你添加textFile.cache
到上面的代码:
val textFile = sc.textFile("/user/emp.txt") textFile.cache
它什么都不做。 RDD.cache
也是一个懒惰的操作。 该文件仍然不被读取。 但是现在RDD说:“读取这个文件然后caching内容”。 如果你第一次运行textFile.count
,文件将被加载,caching和计数。 如果textFile.count
调用textFile.count
,操作将使用caching。 它将只从caching中取数据并计算行数。
caching行为取决于可用内存。 例如,如果该文件不适合内存,则textFile.count
将回退到通常的行为并重新读取该文件。
我认为这个问题会更好的表述为:
我们什么时候需要调用caching或坚持RDD?
Spark进程是懒惰的,也就是说,除非需要,否则什么都不会发生。 要快速回答这个问题,在发布val textFile = sc.textFile("/user/emp.txt")
,没有任何事情发生,只有一个HadoopRDD
被构造,使用该文件作为源。
比方说,我们稍微转换一下这些数据:
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
再次,数据没有任何反应。 现在有一个新的RDD wordsRDD
,它包含对testFile
的引用以及在需要时应用的函数。
只有在RDD上调用某个动作时,才能执行名为lineage的RDD链,如wordsRDD.count
。 也就是说,在分区中分解的数据将由Spark集群的执行者加载, flatMap
函数将被应用并且结果将被计算。
在线性谱系中,就像这个例子中的那样,不需要cache()
。 数据将被加载到执行程序,所有的转换将被应用,最后计算所有的内存 – 如果数据适合内存。
当RDD沿袭分支时, cache
是有用的。 假设你想把前一个例子中的单词过滤为正数和负数的单词。 你可以这样做:
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
在这里,每个分支发出数据重新加载。 添加一个显式的cache
语句将确保以前完成的处理被保留并重用。 这个工作看起来像这样:
val textFile = sc.textFile("/user/emp.txt") val wordsRDD = textFile.flatMap(line => line.split("\\W")) wordsRDD.cache() val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
出于这个原因, cache
被认为是“打破血统”,因为它创build了一个可以重新用于进一步处理的检查点。
经验法则:当你的RDD的血统分支出来,或者当一个RDD被多次使用时,像循环一样使用cache
。
我们是否需要明确地调用“cache”或“persist”来将RDD数据存储到内存中?
是的,只有在需要的时候。
RDD数据默认以分布的方式存储在内存中?
没有!
这就是为什么:
-
Spark支持两种types的共享variables:广播variables,可用于在所有节点上caching内存中的值,以及累加器,这些variables只是“添加”到的variables,如计数器和总和。
-
RDD支持两种types的操作:转换(从现有的数据集创build新的数据集)和操作(在数据集上运行计算后将值返回给驱动程序)。 例如,map是一个通过函数传递每个数据集元素的变换,并返回一个代表结果的新RDD。 另一方面,reduce是一个动作,它使用某个函数聚合RDD的所有元素,并将最终结果返回给驱动程序(尽pipe还有一个并行reduceByKey返回一个分布式数据集)。
-
Spark中的所有转换都是懒惰的,因为它们不会马上计算结果。 相反,他们只记得应用于某些基础数据集(例如文件)的转换。 只有在动作需要将结果返回给驱动程序时才会计算转换。 这种devise使得Spark能够更高效地运行 – 例如,我们可以意识到通过map创build的数据集将被用于reduce,并且只会将reduce的结果返回给驱动程序,而不是返回更大的映射数据集。
-
默认情况下,每次对其执行操作时,每个已转换的RDD都可能重新计算。 但是,您也可以使用持久化(或caching)方法将RDD保留在内存中,在这种情况下,Spark将保留群集中的元素,以便在下次查询时快速访问。 还支持在磁盘上持久化RDD,或在多个节点上复制RDD。
有关更多详细信息,请查看Spark编程指南 。
添加另一个添加(或临时添加) cache
方法调用的原因。
用于debugging内存问题
使用cache
方法,spark会给出有关RDD大小的debugging信息。 所以在火花集成UI中,您将获得RDD内存消耗信息。 这对于诊断内存问题非常有帮助。
以下是您应该cachingRDD的三种情况:
多次使用RDD
在相同的RDD上执行多个操作
长链(或非常昂贵的)转换