Skip to content

Conversation

@squito
Copy link
Contributor

@squito squito commented Aug 25, 2015

This eliminates "skipped" stages for jobs that share shuffle dependencies, and instead reuses the same stage. This is done by not removing ShuffleMapStages when a job finishes, but waiting till the shuffle is cleaned by the context cleaner. It does increase memory usage with long jobs with lots of stages (though its the same order as before, since we already hold on to shuffle data in MapOutputTracker).

The advantage is simplified code and a clearer experience for the end user -- jobs which reference an already completed stage link to the already completed stage, rather than referencing a new stage which gets "skipped", which is always confusing. (Perhaps it could still use a better UI treatment to make it clear that stage had already completed as part of a previous job.)

@squito
Copy link
Contributor Author

squito commented Aug 25, 2015

Jenkins, retest this please

@markhamstra
Copy link
Contributor

This is going to increase memory pressure. The very early code never cleaned up the Stage-tracking data structures at all, which was clearly unacceptable for long-running Applications. What we have now cleans up as soon as possible, and thus has minimal memory pressure. What you have in this PR lands somewhere in between, and could cause problems if a lot of Stages stick around for a long time.

@SparkQA
Copy link

SparkQA commented Aug 25, 2015

Test build #41556 has finished for PR 8427 at commit 830e0c8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode
    • case class Union(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right)
    • case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right)
    • case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right)

@squito squito changed the title [SPARK-10193] [core] [wip] [SPARK-10193] [core] [wip] Eliminate Skipped Stages by reusing ShuffleMapStages Aug 25, 2015
@squito
Copy link
Contributor Author

squito commented Aug 25, 2015

@markhamstra yup, no question this will increase memory usage. The question is, should we consider it anyway? Maybe you were implicitly answering "no", but I'm gonna make my case again in any case :)

Clearly, if you have long running jobs w/ lots of stages, and you never do anything to clean them up, then stageIdToStage is going to eat up all your memory. But that will happen anyway, you'll already run out of memory because of MapOutputTracker storing shuffle output (and most likely the huge number of RDDs you've created that can't be gc'ed either). We add a few more hashmap entries and more Stage objects, which shouldn't contain anything huge -- no bigger than what we are already tracking. Certainly it'll have an effect, though.

I think its a pretty big usability improvement, so worth considering, but that is totally subjective. I realize this is a bit hand wavy now -- I'll try to quantify the memory usage effect so we can make a more informed decision (if others are still interested somewhat).

@markhamstra
Copy link
Contributor

I wasn't intending to answer "no", but rather just wanting to make sure that we think through the implications of this change. It will increase memory pressure some, but I agree that it shouldn't be a lot because of the already present references via the MapOutputTracker. On balance, I'm inclined to agree with you that this is worth doing.

@SparkQA
Copy link

SparkQA commented Aug 26, 2015

Test build #41625 timed out for PR 8427 at commit 4931ccc after a configured wait of 175m.

@mridulm
Copy link
Contributor

mridulm commented Jan 24, 2016

Just a note about MapOutputTracker - it is fairly trivial to make it use bare minimum amount of memory even if it does not get cleaned up for 'old' stages : using a disk backed map (mapdb for example) via LRU.
Which keeps utmost current and previous map output in memory and everything else on disk (until there is a node failure requiring recomputation - which brings portions of this back into memory).

This is what we used to do for production jobs in some earlier projects.

I am not sure what the impact of the current proposal is from memory overhead pov - map output was (obviously) expensive enough to attempt this and the affect was not pervasive/diffuse across the codebase for shuffle output tracking.

@rxin
Copy link
Contributor

rxin commented Jun 15, 2016

Thanks for the pull request. I'm going through a list of pull requests to cut them down since the sheer number is breaking some of the tooling we have. Due to lack of activity on this pull request, I'm going to push a commit to close it. Feel free to reopen it or create a new one. We can also continue the discussion on the JIRA ticket.

@asfgit asfgit closed this in 1a33f2e Jun 15, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants