Skip to content

Conversation

@mateiz
Copy link
Member

@mateiz mateiz commented Aug 14, 2013

This is a work in progress to simplify and improve a few things in the TaskSet scheduler and the way we handle locality. Here's a summary of changes (see also the commit messages):

  • Eliminated most hostPort based data structures and just used executor IDs instead, since the hostPort vs host stuff was confusing (and not checkable with static typing, leading to ugly debug code), and hostPorts are not provided by Mesos. The hostPort stuff was there from before when we could easily pass executorIDs through.
  • Removed non-local fallback stuff in ClusterScheduler that tried to launch less-local tasks on a node once the local ones were all assigned. This change didn't work because many cluster schedulers send offers for just one node at a time (even the standalone and YARN ones do so as nodes join the cluster one by one). Thus, lots of non-local tasks would be assigned even though a node with locality for them would be able to receive tasks just a short time later.
  • Added multi-level delay scheduling in ClusterTaskSetManager so you can have different waits for each locality level. This covers part of the use case for the fallback stuff above (for users who wanted to skip node-local waits and just go for rack-local).
  • Updated the way ClusterTaskSetManager handles racks: instead of enqueueing a task to a separate queue for all the hosts in the rack, which would create lots of large queues, have one queue per rack name.
  • Added periodic revival of offers in StandaloneSchedulerBackend (this was an option in the YARN patch but seems good to have by default).
  • Renamed map output tracker "generations" to "epochs".
  • Simplified and cleaned up a bunch of the code.

The only thing missing is new unit tests for this stuff, which I'm working on, but the code appears to run fine so I wanted to put it up for others to try.

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/581/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matei - what are the semantics of these different waits? Say that you want a task to bump from PROCESS_LOCAL to NODE_LOCAL after 10 seconds. Would you set the wait to 10 seconds for PROCESS_LOCAL or NODE_LOCAL? It would be good to make that clear in the docs.

It might make sense to have the default wait be different for different levels so that the "turn key" performance is pretty good. For instance, it's probably fine to make the PROCESS_LOCAL -> NODE_LOCAL switch after only a few seconds. But going from PROCESS_LOCAL to RACK_LOCAL should be a lot longer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea; I'll add a doc page on this. Basically there is a separate jump between each pair of levels: for example process -> (wait 3 sec) -> node -> (wait 3 more sec) -> rack. You can configure each wait separately.

My instinct was that it's not obvious which jumps will cost more, so I'd rather have a simple default setting. For example, going from process to node could be really bad for data that's expensive to serialize, while going from node to rack might be very cheap with a fast network.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - so I guess my main question then was whether this defines the level we are jumping to or jumping from. I guess since you can't set this for ANY it's the level you are jumping from.

@mridulm
Copy link
Contributor

mridulm commented Aug 14, 2013

Hi Matei,

I have added a few comments, unfortunately I am unable to go through this at the level of detail I would like to - apologies for that.

In general, a few things though :

a) There were a few sync blocks removed - we might need to re-introduce those back.
(Not just for MT-safety, which might not be a big issue, but also to ensure visibility of updates since same thread need not necessarily be used by akka/etc).

b) I noticed change from hostPort to host, executorId - would it not be better to remove host also ?
The earlier code was more of a hack - since I was trying to reconcile pre-executorId codebase with significant drift in yarn branch which had host -> hostPort change.

c) I added a few comments about scheduling in general in the code, and why we hacked in reviveOffers thread : coupled with my earlier mail on this subject, hopefully I have not messed up explaining my rationale for those changes !

Thanks,
Mridul

@mateiz
Copy link
Member Author

mateiz commented Aug 14, 2013

Thanks for the feedback, Mridul, this is super helpful! I'll address some of the issues you brought up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor typo: readding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, nevermind! re-adding, not readding :)

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/610/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/635/

mateiz added 6 commits August 18, 2013 19:51
- Replace use of hostPort vs host in Task.preferredLocations with a
  TaskLocation class that contains either an executorId and a host or
  just a host. This is part of a bigger effort to eliminate hostPort
  based data structures and just use executorID, since the hostPort vs
  host stuff is confusing (and not checkable with static typing, leading
  to ugly debug code), and hostPorts are not provided by Mesos.

- Replaced most hostPort-based data structures and fields as above.

- Simplified ClusterTaskSetManager to deal with preferred locations in a
  more concise way and generally be more concise.

- Updated the way ClusterTaskSetManager handles racks: instead of
  enqueueing a task to a separate queue for all the hosts in the rack,
  which would create lots of large queues, have one queue per rack name.

- Removed non-local fallback stuff in ClusterScheduler that tried to
  launch less-local tasks on a node once the local ones were all
  assigned. This change didn't work because many cluster schedulers send
  offers for just one node at a time (even the standalone and YARN ones
  do so as nodes join the cluster one by one). Thus, lots of non-local
  tasks would be assigned even though a node with locality for them
  would be able to receive tasks just a short time later.

- Renamed MapOutputTracker "generations" to "epochs".
- Added periodic revival of offers in StandaloneSchedulerBackend

- Replaced task scheduling aggression with multi-level delay scheduling
  in ClusterTaskSetManager

- Fixed ZippedRDD preferred locations because they can't currently be
  process-local

- Fixed some uses of hostPort
- When a resourceOffers() call has multiple offers, force the TaskSets
  to consider them in increasing order of locality levels so that they
  get a chance to launch stuff locally across all offers

- Simplify ClusterScheduler.prioritizeContainers

- Add docs on the new configuration options
resetting locality level after a non-local launch
@mateiz
Copy link
Member Author

mateiz commented Aug 19, 2013

Jenkins, please retest this

@mateiz
Copy link
Member Author

mateiz commented Aug 19, 2013

Alright, I think this is pretty much done; I also added some unit tests and found a bug with them (ClusterTaskSetManager was setting currentLocalityIndex to allowedLocality instead of taskLocality).

One final improvement might be to add support for rack locality in non-YARN modes, but we can do that as a separate PR.

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/646/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/647/

@mridulm
Copy link
Contributor

mridulm commented Aug 19, 2013

I ran some tests with this PR (as pulled on saturday). The results were mixed -

The setup:
a) baseline - spark.tasks.revive_periodic.interval=200, spark.tasks.schedule.aggression=RACK
b) pr applied, enabled - spark.locality.wait.process=0 spark.locality.wait.node=0 spark.locality.wait.rack=3000 (to mirror (a) as closely as possible).
c) pr applied, but not enabled - so run with default config.

The relative performance numbers below are for 4 runs of the expts, and both min and avg of the time taken via 'time' command (so end to end time).

For lower number of nodes ( <= 10) , (b) performed better : from 2% to 10% for the experiments. For (c), the results were break even to worse by 4% or so.

But as the number of nodes increase, the relative performance worsens, and somewhere between 20 to 25, (b) becomes worse than (a).
At 100 nodes, (b) is about 18 % worse and (c) is about 41% worse.

I did not try with large numbers though (partly due to diminishing returns for the dataset, and partly because the graphs were diverging anyway, so intutively it should worsen).

This is much better than earlier though (without 2a4ed10).
Also note that the relative difference between (b) and (c) is much lower than earlier (with and without aggression) - which can be taken as a positive change : default out of box performance is better.

I am planning to rerun the expt set once again with the latest PR, just to be sure.

@mateiz
Copy link
Member Author

mateiz commented Aug 19, 2013

I actually did fix a small bug in it yesterday, where it wasn't properly remembering the locality of each launched task.

Instead of c), can you also try setting process spark.locality.wait.process=3000, spark.locality.wait.node=0, and spark.locality.wait.rack=3000? This is a new configuration possible with this PR that lets you launch tasks on HDFS in a rack-local manner while still getting good locality for cached data. (Is your test actually using any caching by the way?)

@mateiz
Copy link
Member Author

mateiz commented Aug 19, 2013

Also, if you want to match a), you need to set spark.scheduler.revive.interval=200 in the new code too�.

@mridulm
Copy link
Contributor

mridulm commented Aug 19, 2013

I was trying to replicate exactly similar test env for both baseline and the pr change (hence spark.locality.wait.process=0).

I did not notice that revive thread timeout was configurable - will change the test to use spark.scheduler.revive.interval = 200, thanks !

@mridulm
Copy link
Contributor

mridulm commented Aug 19, 2013

Looks great now Matei !
With the latest commits the preliminary numbers with PR enabled is within margin error with baseline.

@mateiz
Copy link
Member Author

mateiz commented Aug 20, 2013

Thanks for testing it, Mridul. It also does fine on Patrick's regression tests (https://github.com/amplab/spark-perf), so I'm going to merge it in.

mateiz added a commit that referenced this pull request Aug 20, 2013
Scheduler fixes and improvements
@mateiz mateiz merged commit 8cae72e into mesos:master Aug 20, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants