Skip to content

Conversation

@ash211
Copy link
Contributor

@ash211 ash211 commented Aug 25, 2014

Instead of jumping straight from 1 partition to all partitions, do exponential
growth and double the number of partitions to attempt each time instead.

Fix proposed by Paul Nepywoda

Instead of jumping straight from 1 partition to all partitions, do exponential
growth and double the number of partitions to attempt each time instead.

Fix proposed by Paul Nepywoda
@SparkQA
Copy link

SparkQA commented Aug 25, 2014

QA tests have started for PR 2117 at commit 3a156b8.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 25, 2014

QA tests have finished for PR 2117 at commit 3a156b8.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor Author

ash211 commented Aug 25, 2014

Failed unit test looks unrelated:

[info] - spark submit includes jars passed in through --jar *** FAILED ***

@aarondav
Copy link
Contributor

Why would this be OOM-prone? This would only cause us to run a longer job than we need to if it turns out the first few partitions actually do contain enough results.

Also, this sort of doubling seems like it'd be relatively slow in the case where very few (possibly 0) elements exist, as you'd effectively search the whole thing twice.

Edit: sorry, forgot that we only continue with unscanned partitions, this would not cause re-scanning prior partitions. This may still cause a significant slowdown on an underutilized cluster, however, as we would have log(n) waves rather than just 2 waves of computing over our RDD. It may cost as much time to scan 2 partitions as it does to scan n partitions (for n cores in our cluster).

@ash211
Copy link
Contributor Author

ash211 commented Aug 26, 2014

The reason it's OOM-prone is that if the results of take on the first partition is size 0, then it runs take(n) on all the remaining partitions and brings them back to the driver. Then it takes from that set of partition take()s until it fills the N results.

That happens here:

val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)

res.foreach(buf ++= _.take(num - buf.size))

Between those two lines, the contents of allPartitions.map(_.take(left)) are in the driver's memory all at once.

In my situation I had 10k partitions, the first one was size=0, and I did a take(300). So after the first one returned 0 results, this pulled 300 results from each of the remaining 9,999 partitions, which OOM'd the driver.

This patch isn't bulletproof in the sense that it relies on the heuristic that you'll get to a doubling that contains enough rows to fulfill the take() before the onslaught of too much data causes an OOM.

An alternative approach would be to evaluate the .take(n) across the cluster instead and use the new .toLocalIterator call which didn't exist when this code was first being written. This would handle your observation that 2 waves of calculation ought to be faster than log(n) waves across the cluster.

I'm thinking something more like:

rdd.mapPartition(_.take(n)).toLocalIterator().take(n)

@mateiz
Copy link
Contributor

mateiz commented Aug 27, 2014

This looks good to me as is because it only happens in the case where the first partition had 0 elements. @aarondav what do you think? This case will be pretty uncommon, and the other code in take() already does a smarter increase.

@ash211 please also fix this in Python, in python/pyspark/rdd.py. The code there is almost identical.

@aarondav
Copy link
Contributor

Alright, the current solution seems reasonable to me. Maybe we can increase the scale factor to something like 4 instead of 2, as a minor speedup aimed at the case where there are no results, but this is relatively unimportant.

@mateiz
Copy link
Contributor

mateiz commented Aug 27, 2014

Yeah 4 sounds good. The reason this code was added in the first place was that take() on filtered datasets was super slow because it originally took only one partition at a time.

@ash211
Copy link
Contributor Author

ash211 commented Sep 3, 2014

Fixed the same in Python, corrected the comment, and switched from *2 to *4 for faster take() on mostly-empty partition.

Let's see how the unit tests go.

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have started for PR 2117 at commit 8b2299a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have finished for PR 2117 at commit 8b2299a.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor Author

ash211 commented Sep 3, 2014

Failed unit test seems unrelated:

- Count two triangles
java.io.IOException: PARSING_ERROR(2)
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)

@nchammas
Copy link
Contributor

nchammas commented Sep 3, 2014

@ash211 This sounds like it's related to the OOM reported in SPARK-3333, though after reporting the issue there I was unable to repro the OOM. (Though there were other issues in the mix in that JIRA report that we were able to repro.)

Do you have a simple script that can cause an OOM due this empty partitions / .take() issue?

@JoshRosen
Copy link
Contributor

The OOM was probably due to the Spark driver having too little memory, since a job with a huge number of tasks can have a large working set in the scheduler. I think the default is 512 megabytes.

To increase the driver memory, try setting the SPARK_DRIVER_MEMORY environment variable or passing the --driver-memory flag to spark-submit / pyspark / spark-shell.

@mateiz
Copy link
Contributor

mateiz commented Sep 5, 2014

Jenkins, test this please

@mateiz
Copy link
Contributor

mateiz commented Sep 5, 2014

Sorry for the delay on this -- fix looks good but I'd like it to run through Jenkins, and Jenkins has been down today.

@mateiz
Copy link
Contributor

mateiz commented Sep 5, 2014

Jenkins, test this please

@ash211
Copy link
Contributor Author

ash211 commented Sep 5, 2014

@nchammas I'm guessing your OOM issue is unrelated to this one.

a = sc.parallelize(["Nick", "John", "Bob"])
a = a.repartition(24000)
a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y, sc.defaultParallelism).take(1)

After the reduceByKey above, you'd have 24000 partitions and only 2 entries in them: (4, "NickJohn") and (3, "Bob"). This bug manifests when you have an empty partition 0 and many remaining partitions each with a large amount of data. The .take(n) gets up to the first n from each remaining partition and then takes the first n from the concatenation of those arrays.

For this bug to take effect on your situation you'd have to have an empty first partition (a good 23998/24000 chance). The driver would then bring into memory 23998 empty arrays and 2 arrays of size 1 (or maybe 1 array of size 2), which I can't imagine would OOM the driver. So I don't think this is your bug.

The other evidence is that you observed a regression (at least the perf numbers later in your bug) and this has been the same for quite some time. The current behavior was implemented in commit 42571d3 and was first released in version 0.9:

aash@aash-mbp ~/git/spark$ git log origin/branch-1.0 | grep 42571d30d0d518e69eecf468075e4c5a823a2ae8
commit 42571d30d0d518e69eecf468075e4c5a823a2ae8
aash@aash-mbp ~/git/spark$ git log origin/branch-0.9 | grep 42571d30d0d518e69eecf468075e4c5a823a2ae8
commit 42571d30d0d518e69eecf468075e4c5a823a2ae8
aash@aash-mbp ~/git/spark$ git log origin/branch-0.8 | grep 42571d30d0d518e69eecf468075e4c5a823a2ae8
aash@aash-mbp ~/git/spark$

@ash211
Copy link
Contributor Author

ash211 commented Sep 5, 2014

Regarding the merge, I'm guessing this is too late to land in the Spark 1.1 release. Is it a candidate for a backport to a 1.1.x?

@nchammas
Copy link
Contributor

nchammas commented Sep 5, 2014

@ash211 Thank you for explaining that.

@mateiz
Copy link
Contributor

mateiz commented Sep 5, 2014

Jenkins, test this please

@mateiz
Copy link
Contributor

mateiz commented Sep 5, 2014

@ash211 since this is a bug fix it seems fine to put it into 1.1.1.

@SparkQA
Copy link

SparkQA commented Sep 5, 2014

QA tests have started for PR 2117 at commit 8b2299a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 6, 2014

QA tests have finished for PR 2117 at commit 8b2299a.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mateiz
Copy link
Contributor

mateiz commented Sep 6, 2014

Thanks Andrew -- merged this in.

@asfgit asfgit closed this in ba5bcad Sep 6, 2014
asfgit pushed a commit that referenced this pull request Sep 6, 2014
Instead of jumping straight from 1 partition to all partitions, do exponential
growth and double the number of partitions to attempt each time instead.

Fix proposed by Paul Nepywoda

Author: Andrew Ash <[email protected]>

Closes #2117 from ash211/SPARK-3211 and squashes the following commits:

8b2299a [Andrew Ash] Quadruple instead of double for a minor speedup
e5f7e4d [Andrew Ash] Update comment to better reflect what we're doing
09a27f7 [Andrew Ash] Update PySpark to be less OOM-prone as well
3a156b8 [Andrew Ash] SPARK-3211 .take() is OOM-prone with empty partitions

(cherry picked from commit ba5bcad)
Signed-off-by: Matei Zaharia <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants