(为什么)我们需要调用caching还是坚持RDD

当从文本文件或集合(或从另一个RDD)创build弹性分布式数据集(RDD)时,是否需要显式调用“cache”或“persist”以将RDD数据存储到内存中? 或者默认情况下RDD数据是以分布的方式存储在内存中的?

val textFile = sc.textFile("/user/emp.txt") 

根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在所有/某些节点的内存中使用。

如果是这样,为什么我们需要在textFile RDD上调用“cache”或“persist”呢?

大多数RDD操作是懒惰的。 把RDD看作是对一系列操作的描述。 RDD不是数据。 所以这一行:

 val textFile = sc.textFile("/user/emp.txt") 

它什么都不做。 它创build一个RDD,表示“我们将需要加载这个文件”。 这个文件没有加载。

需要观察数据内容的RDD操作不能懒惰。 (这些被称为动作 。)一个例子是RDD.count – 告诉你文件中的行数,这个文件需要被读取。 所以,如果你写了textFile.count ,在这一点上文件将被读取,行将被计数,并计数将被返回。

如果你再次调用textFile.count呢? 同样的事情:文件将被读取并重新计数。 没有储存。 RDD不是数据。

那么RDD.cache做什么? 如果你添加textFile.cache到上面的代码:

 val textFile = sc.textFile("/user/emp.txt") textFile.cache 

它什么都不做。 RDD.cache也是一个懒惰的操作。 该文件仍然不被读取。 但是现在RDD说:“读取这个文件然后caching内容”。 如果你第一次运行textFile.count ,文件将被加载,caching和计数。 如果textFile.count调用textFile.count ,操作将使用caching。 它将只从caching中取数据并计算行数。

caching行为取决于可用内存。 例如,如果该文件不适合内存,则textFile.count将回退到通常的行为并重新读取该文件。

我认为这个问题会更好的表述为:

我们什么时候需要调用caching或坚持RDD?

Spark进程是懒惰的,也就是说,除非需要,否则什么都不会发生。 要快速回答这个问题,在发布val textFile = sc.textFile("/user/emp.txt") ,没有任何事情发生,只有一个HadoopRDD被构造,使用该文件作为源。

比方说,我们稍微转换一下这些数据:

 val wordsRDD = textFile.flatMap(line => line.split("\\W")) 

再次,数据没有任何反应。 现在有一个新的RDD wordsRDD ,它包含对testFile的引用以及在需要时应用的函数。

只有在RDD上调用某个动作时,才能执行名为lineage的RDD链,如wordsRDD.count 。 也就是说,在分区中分解的数据将由Spark集群的执行者加载, flatMap函数将被应用并且结果将被计算。

在线性谱系中,就像这个例子中的那样,不需要cache() 。 数据将被加载到执行程序,所有的转换将被应用,最后计算所有的内存 – 如果数据适合内存。

当RDD沿袭分支时, cache是有用的。 假设你想把前一个例子中的单词过滤为正数和负数的单词。 你可以这样做:

 val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count() 

在这里,每个分支发出数据重新加载。 添加一个显式的cache语句将确保以前完成的处理被保留并重用。 这个工作看起来像这样:

 val textFile = sc.textFile("/user/emp.txt") val wordsRDD = textFile.flatMap(line => line.split("\\W")) wordsRDD.cache() val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count() 

出于这个原因, cache被认为是“打破血统”,因为它创build了一个可以重新用于进一步处理的检查点。

经验法则:当你的RDD的血统分支出来,或者当一个RDD被多次使用时,像循环一样使用cache

我们是否需要明确地调用“cache”或“persist”来将RDD数据存储到内存中?

是的,只有在需要的时候。

RDD数据默认以分布的方式存储在内存中?

没有!

这就是为什么:

  • Spark支持两种types的共享variables:广播variables,可用于在所有节点上caching内存中的值,以及累加器,这些variables只是“添加”到的variables,如计数器和总和。

  • RDD支持两种types的操作:转换(从现有的数据集创build新的数据集)和操作(在数据集上运行计算后将值返回给驱动程序)。 例如,map是一个通过函数传递每个数据集元素的变换,并返回一个代表结果的新RDD。 另一方面,reduce是一个动作,它使用某个函数聚合RDD的所有元素,并将最终结果返回给驱动程序(尽pipe还有一个并行reduceByKey返回一个分布式数据集)。

  • Spark中的所有转换都是懒惰的,因为它们不会马上计算结果。 相反,他们只记得应用于某些基础数据集(例如文件)的转换。 只有在动作需要将结果返回给驱动程序时才会计算转换。 这种devise使得Spark能够更高效地运行 – 例如,我们可以意识到通过map创build的数据集将被用于reduce,并且只会将reduce的结果返回给驱动程序,而不是返回更大的映射数据集。

  • 默认情况下,每次对其执行操作时,每个已转换的RDD都可能重新计算。 但是,您也可以使用持久化(或caching)方法将RDD保留在内存中,在这种情况下,Spark将保留群集中的元素,以便在下次查询时快速访问。 还支持在磁盘上持久化RDD,或在多个节点上复制RDD。

有关更多详细信息,请查看Spark编程指南 。

添加另一个添加(或临时添加) cache方法调用的原因。

用于debugging内存问题

使用cache方法,spark会给出有关RDD大小的debugging信息。 所以在火花集成UI中,您将获得RDD内存消耗信息。 这对于诊断内存问题非常有帮助。

以下是您应该cachingRDD的三种情况:

多次使用RDD

在相同的RDD上执行多个操作

长链(或非常昂贵的)转换