Skip to content

Conversation

@sitalkedia
Copy link

What changes were proposed in this pull request?

Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR #9241

How was this patch tested?

Tested by running a job and observed around 30% speedup after this change.

@sitalkedia sitalkedia force-pushed the executor_oom branch 2 times, most recently from 4be2021 to 707c9bc Compare April 10, 2016 08:34
@sitalkedia
Copy link
Author

cc @davies

@andrewor14
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Apr 12, 2016

Test build #55544 has finished for PR 12285 at commit 707c9bc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

@andrewor14 - Thanks for taking a look. Handled the test case failures.

@SparkQA
Copy link

SparkQA commented Apr 12, 2016

Test build #55561 has finished for PR 12285 at commit c318a35.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

@andrewor14 - Seems like some transient Jenkins failure. Can we rerun the test?

@davies
Copy link
Contributor

davies commented Apr 12, 2016

@sitalkedia I think this is not a memory leak, it just does not release the memory as soon as possible. What does you plan looks like?

@sitalkedia
Copy link
Author

@davies - Thanks for looking into it. I agree with you that its not a memory leak because that memory may be used later. However, not reducing the pointer array size to the initial size in case of spill is causing heavy memory underutilization because the tasks are not able to get sufficient memory for the storing the records and this situation often lead to the executor OOM. Also, I don't see any reason why would we want to keep the bloated pointer array if we are spilling all data to disk and not have anything to store in the pointer array. This change is restoring the behavior of the sorter before the PR #9241 in https://github.com/apache/spark/pull/9241/files#diff-3eedc75de4787b842477138d8cc7f150L321.

The physical plan looks something like this -

== Physical Plan ==
SortBasedAggregate(key=[shard_id#7L,id#11L,target#9,target_id#12L], functions=[(hiveudaffunction(HiveFunctionWrapper(UDAFCollectMap@270df931),feature_id#10,feature_value#13,false,0,0),mode=Complete,isDistinct=false)], output=[shard_id#7L,id#11L,target#9,target_id#12L,feature_map#14])
+- ConvertToSafe
   +- Sort [shard_id#7L ASC,id#11L ASC,target#9 ASC,target_id#12L ASC], false, 0
      +- TungstenExchange hashpartitioning(shard_id#7L,321), None
         +- Project [(id#11L % 321) AS shard_id#7L,id#11L,target#9,target_id#12L,feature_id#10,feature_value#13]
            +- Filter ((((id#11L > 0) && (target_id#12L > 0)) && NOT (id#11L = target_id#12L)) && (cast(feature_value#13 as double) > 0.001))
               +- HiveTableScan [id#11L,feature_value#13,feature_id#10,target#9,target_id#12L], MetastoreRelation x, y, None, [(ds#8 = 2016-03-20),target#9 IN (1,2,3),feature_id#10 INSET (1, 2)]

@davies
Copy link
Contributor

davies commented Apr 12, 2016

In your case, inside sorting, the key has 4 columns, the row has 6 columns, so each pair will need about 90 bytes, the array used by sort needs 16 bytes, so the memory used by array should be 15% of all memory used by execution. In worse case, free the array finally could waste about 15% of the memory, how can it make that big difference?

If your data set is huge, which requires spill multiple times, the size of spilled data should be similar each time, so the required array should be similar. If we free that finally, we don't need to grow the array in the middle or two spills (the grow require 50% more memory for array), that's the reason I changed to free the array finally.

The reason your job will OOM is that the memory used by Hive UDAF UDAFCollectMap is not managed by Spark, the better solution could be reduce the memory faction for Spark to leave more memory for UDAFCollectMap. After this patch, you may still see OOM, if UDAFCollectMap use even more memory.

I agreed that the current patch is good (try to free memory as much as it can). Just try to understand more, please correct me if something is wrong.

@sitalkedia
Copy link
Author

@davies Thanks for the explanation, your calculation makes sense. You are right that freeing the array can only make a difference of 15% in ideal case. But what we are experiencing is something different.

Consider the following scenario - We have total shuffle memory of 10G available for 5 tasks. So in ideal situation, each tasks should be assigned 2G of shuffle memory each. And out of those 2G, 300MB should be allocated to the pointer array and rest for storing the records. Now lets say 3 of the tasks finish at the same time and before the driver could run additional tasks on the executor, rest 2 running task aggressively expend their memory and take upto 5G of shuffle memory on the executor, resulting in the pointer array size of around 750MB. Now when the driver runs additional 3 tasks on the executor, previous 2 tasks will be forced to spill, but the pointer array of size 750MB is never freed. This will result in heavy underutilization of memory for the task and in cases where the pointer array actually grew more than the fair share memory of the task, it will result in executor OOM and causing all other tasks on the executor to die.

The job we are running is processing a huge data set of size more than 50TB and we were seeing more than 5% task failure due to OOM. After this fix, we are the failure rate has come down to less than 0.01% and we gain a massive 30% cpu speedup.

@davies
Copy link
Contributor

davies commented Apr 12, 2016

That make sense, thanks for the explanation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can still call it reset, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or call it shrinkMemory() and return the size of freed memory?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, will do.

@SparkQA
Copy link

SparkQA commented Apr 12, 2016

Test build #2777 has finished for PR 12285 at commit c318a35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia sitalkedia force-pushed the executor_oom branch 2 times, most recently from 75a44f9 to b102c25 Compare April 12, 2016 17:47
@sitalkedia
Copy link
Author

@davies - Thanks for the review. I have addressed all the comments, please let me know how it looks.

@SparkQA
Copy link

SparkQA commented Apr 12, 2016

Test build #55626 has finished for PR 12285 at commit 75a44f9.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2016

Test build #55627 has finished for PR 12285 at commit b102c25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry that I misread this last night, this is spillSize (the number of bytes written into disk), not the amount of freed memory, so we don't need to add the amount from inMemSorter.

Sorry again.

@davies
Copy link
Contributor

davies commented Apr 12, 2016

@sitalkedia Sorry for the trouble.

@sitalkedia
Copy link
Author

@davies - no issues, I will change it back.


writeSortedFile(false);
final long spillSize = freeMemory();
inMemSorter.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to move this call?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to reset the pointer array only after freeing up the memory pages holding records. Otherwise it might happen that the task might not get memory for the pointer array if it is already holding a lot of memory.

writeSortedFile(false);
final long spillSize = freeMemory();
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this comment into reset()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, comment in ShuffleExternalSorter makes it easier to get the context and understand. Also in future, if some one tries to move this call, he will not do so, seeing the comment. If the comment is in the reset() function, someone might inadvertently move this call without seeing the comment in reset() function. However, if you have a strong opinion about it, I would gladly move the comment into reset(). Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I don't have strong opinion on it.

@davies
Copy link
Contributor

davies commented Apr 12, 2016

LGTM, will merging once passing the tests. Thanks for working on it.

@sitalkedia
Copy link
Author

Thanks a lot for your quick review and response :).

@SparkQA
Copy link

SparkQA commented Apr 12, 2016

Test build #55646 has finished for PR 12285 at commit d89adf8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in d187e7d Apr 12, 2016
asfgit pushed a commit that referenced this pull request Apr 12, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR #9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <[email protected]>

Closes #12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7d)
Signed-off-by: Davies Liu <[email protected]>

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@davies
Copy link
Contributor

davies commented Apr 12, 2016

Merged into master and 1.6 branch (fixed the conflicts)

zzcclp pushed a commit to zzcclp/spark that referenced this pull request Apr 13, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR apache#9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <[email protected]>

Closes apache#12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7d)
Signed-off-by: Davies Liu <[email protected]>

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

(cherry picked from commit 413d060)
liyezhang556520 pushed a commit to liyezhang556520/spark that referenced this pull request Apr 15, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR apache#9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <[email protected]>

Closes apache#12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7d)
Signed-off-by: Davies Liu <[email protected]>

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
Parth-Brahmbhatt pushed a commit to Parth-Brahmbhatt/spark that referenced this pull request Jul 25, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR apache#9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <[email protected]>

Closes apache#12285 from sitalkedia/executor_oom.
@notflorian
Copy link

notflorian commented Aug 11, 2016

Sorry to camp this old issue.

I have a similar issue : with spark 1.6.1 in scala, I've a lot of executor OOM when I try to write the content of a RDD into multiple gziped files in hadoop:

rdd.saveAsHadoopFile(path, 
    classOf[String], 
    classOf[String], 
    classOf[RDDMultipleTextOutputFormat], 
    classOf[GzipCodec])

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[String, String] {
  override def generateFileNameForKeyValue(key: String, value: String, name: String) = key + name

  override def generateActualKey(key: String, value: String) = null
}

It worked fine when I tried to do a rdd.saveAsTextFile, or a rdd.saveAsHadoopFile without the GzipCodec.

Do you think the root cause of my issue could also be this memory leak in the Sorter?

Thanks a lot for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants