Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Oct 23, 2015

This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.

Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).

The PrepareRDD may be not needed anymore, could be removed in follow up PR.

The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).

sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()

For thread-safety, here what I'm got:

  1. Without calling spill(), the operators should only be used by single thread, no safety problems.

  2. spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.

  3. if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.

  4. During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.

  5. In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44210 has finished for PR 9241 at commit 3562476.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class BytesToBytesMapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n * abstract class MemoryConsumer\n

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44242 has finished for PR 9241 at commit 0c77c94.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class BytesToBytesMapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n * abstract class MemoryConsumer\n

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #1948 has started for PR 9241 at commit 5c198cf.

@SparkQA
Copy link

SparkQA commented Oct 24, 2015

Test build #44265 has finished for PR 9241 at commit 5c198cf.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class BytesToBytesMapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n * abstract class MemoryConsumer\n

@SparkQA
Copy link

SparkQA commented Oct 24, 2015

Test build #44275 has finished for PR 9241 at commit 86e47ca.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a consequence of no longer needing the reserved page now that we have force-spilling support, right?

@JoshRosen
Copy link
Contributor

I know that this is still WIP, but were you thinking about also enabling this for the two Spillable collections (ExternalAppendOnlyMap and ExternalSorter)? That's probably a lower priority given that we're most concerned about optimizing SQL's memory usage, but it would still be nice to do. If we decide to defer it for now, let's create a followup task to do it in the next release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind omitting the space between numRecords and --?

@JoshRosen
Copy link
Contributor

Hey @davies,

This patch makes sense to me at a high-level, but I have a few questions:

  • Could you add a description to this pull request and share your list of remaining TODOs with me (maybe by posting it as a checklist in the PR description)?
  • Can you comment on the thread-safety concerns here? My current understanding is that we don't have to worry about memory-manager-triggered spills racing with other interactions on the spillables because the iterator model provides some implicit synchronization. In order to convince ourselves that this is safe in all cases, however, I'd like to think through two corner-cases:
    • What happens if a single task contains multiple threads? Currently, this can happen in PythonRDD, PipedRDD, and a couple of other places. All of these cases are situations where we have a writer or reader thread for interacting with an external process. Although we have separate threads, they are somewhat synchronous / coupled via their interaction with the external process. This could be tricky, though, so I'd like to talk through some examples to make sure we've covered all of the tricky cases.
    • What happens if operator B is in the middle of processing a next() call on its iterator, which calls it's parent's next() method, which requires memory to grow, which triggers a spill that drains memory from A and de-allocates or spills data structures that it's relying on? Do we have to make any distinctions between asking an upstream operator to spill versus a downstream one?
    • Are there any risks of deadlocks with the extra synchronization added here?

I'm going to focus on merging my memory manager unification patch tonight so that we can start rebasing this tomorrow.

@SparkQA
Copy link

SparkQA commented Oct 27, 2015

Test build #44447 has finished for PR 9241 at commit d0ada7b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

@davies davies changed the title [SPARK-10342] [SQL] [WIP] Cooperative memory management [SPARK-10342] [SPARK-10309] [SQL] [WIP] Cooperative memory management Oct 27, 2015
@SparkQA
Copy link

SparkQA commented Oct 27, 2015

Test build #44453 has finished for PR 9241 at commit 49b8135.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be final.

@SparkQA
Copy link

SparkQA commented Oct 27, 2015

Test build #44450 has finished for PR 9241 at commit ee6b9a4.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to catch OOME here, I think that we should do it at a much smaller scope (in the assignment to currentPage but not for adding to dataPages or modifying the page cursor. Given the risks of catching OOME that I mentioned above, the scope of the catch should be as narrow as possible.

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44644 has finished for PR 9241 at commit afc8c7c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public abstract class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set this to -1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, because we rely on reader.hasNext() when we're dealing with on-disk data.

@JoshRosen
Copy link
Contributor

LGTM overall right now (pending Jenkins for commit 4ee1f42). We can address any minor issues in followups.

I'll take one final glance at one of the new unit tests when I get home, then will merge this to unblock my patch.

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44647 has finished for PR 9241 at commit cda4b2a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public abstract class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44652 has finished for PR 9241 at commit 4ee1f42.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public abstract class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44656 has finished for PR 9241 at commit e943e74.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public abstract class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

@davies
Copy link
Contributor Author

davies commented Oct 30, 2015

@JoshRosen I'm merging this into master, other comments will be addressed by followup PR.

@JoshRosen
Copy link
Contributor

Great, sounds good to me! I do think that this patch might benefit from more comments to explain the code and I'd be happy to help with that in a followup PR.

@asfgit asfgit closed this in 56419cf Oct 30, 2015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this catch clause be moved to wrap c.spill() at line 142 ?

asfgit pushed a commit that referenced this pull request Apr 12, 2016
## What changes were proposed in this pull request?

Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR #9241

## How was this patch tested?

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <[email protected]>

Closes #12285 from sitalkedia/executor_oom.
asfgit pushed a commit that referenced this pull request Apr 12, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR #9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <[email protected]>

Closes #12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7d)
Signed-off-by: Davies Liu <[email protected]>

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
zzcclp pushed a commit to zzcclp/spark that referenced this pull request Apr 13, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR apache#9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <[email protected]>

Closes apache#12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7d)
Signed-off-by: Davies Liu <[email protected]>

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

(cherry picked from commit 413d060)
liyezhang556520 pushed a commit to liyezhang556520/spark that referenced this pull request Apr 15, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR apache#9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <[email protected]>

Closes apache#12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7d)
Signed-off-by: Davies Liu <[email protected]>

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
asfgit pushed a commit that referenced this pull request Apr 21, 2016
…same thread for memory

## What changes were proposed in this pull request?
In #9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution.
But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from #9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core.

## How was this patch tested?
add two unit tests for it.

Author: Lianhui Wang <[email protected]>

Closes #10024 from lianhuiwang/SPARK-4452-2.
Parth-Brahmbhatt pushed a commit to Parth-Brahmbhatt/spark that referenced this pull request Jul 25, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR apache#9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <[email protected]>

Closes apache#12285 from sitalkedia/executor_oom.
Parth-Brahmbhatt pushed a commit to Parth-Brahmbhatt/spark that referenced this pull request Jul 25, 2016
…same thread for memory

In apache#9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution.
But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from apache#9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core.

add two unit tests for it.

Author: Lianhui Wang <[email protected]>

Closes apache#10024 from lianhuiwang/SPARK-4452-2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants