Why do we need to call cache or persist on a RDD ?
In this RRD operations are lazy. Assume of an RDD as a description of a series of operations. An RDD is not data. So this will be as:
val textFile = sc.textFile("/user/emp.txt")
It will do nothing and This creates an RDD that says “we will need to load this file”. The file is not loaded at this point.
RDD operations that needs observing the contents of the data cannot be lazy. (These are called actions.) An instance is RDD.count — to tell you the number of lines in the file, the file needs to be read. So if you write textFile.count, at this point the file will be read, the lines will be counted, and the count will be returned.
val textFile = sc.textFile("/user/emp.txt") textFile.cache
It will do nothing. RDD.cache is also a lazy operation. The file is still not read. But now the RDD says “read this file and then cache the contents”. When you run textFile.count the first time, the file will be loaded, cached, and counted. If you call textFile.count a second time, the operation will use the cache. It will just take the data from the cache and count the lines.
Here the cache behavior depends on the available memory. If the file does not fit in the memory, for instance, then textFile.count will fall back to the normal practice and re-read the file.
Actually here the Spark processes are lazy, But that is, nothing will happen until it’s required. To quick answer the question, after val textFile = sc.textFile(“/user/emp.txt”) is issued, nothing happens to the data, only a HadoopRDD is constructed, using the file as source.
Here, it transform that data a bit:
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
And, nothing happens to the data. Now there’s a new RDD wordsRDD that contains a reference to testFile and a function to be applied when needed.
Only when an action is called upon an RDD, like wordsRDD.count, the RDD chain, called lineage will be executed. That is, the data, broken down in partitions, will be loaded by the Spark cluster’s executors, the flatMap function will be applied and the result will be calculated.
On a linear lineage, like the one in this example, cache() is not needed. The data will be loaded to the executors, all the transformations will be applied and finally the count will be computed, all in memory – if the data fits in memory.
cache is effective when the lineage of the RDD branches out. Let’s say you want to filter the words of the previous example into a count for positive and negative words
For an example:
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
Here, each branch issues a reload of the data. Adding an explicit cache statement will ensure that processing done previously is preserved and reused. The job will look like this:
val textFile = sc.textFile("/user/emp.txt") val wordsRDD = textFile.flatMap(line => line.split("\\W")) wordsRDD.cache() val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
For the particular reason, cache is said to ‘break the lineage’ as it creates a checkpoint that can be reused for further processing.
Note: The cache is used when the lineage of your RDD branches out or when an RDD is used multiple times like in a loop.
Here is the best solution, by temporarily adding cache method call:
As debug memory issues
By the help of the cache method, spark will give debugging informations regarding the size of the RDD. so in the spark integrated UI, and will get RDD memory consumption info. and this proved very helpful diagnosing memory issues.