Skip to content

Conversation

@ericl
Copy link
Contributor

@ericl ericl commented Sep 2, 2016

What changes were proposed in this pull request?

DAGScheduler invalidates shuffle files when an executor loss event occurs, but not when the external shuffle service is enabled. This is because when shuffle service is on, the shuffle file lifetime can exceed the executor lifetime.

However, it also doesn't invalidate shuffle files when the shuffle service itself is lost (due to whole slave loss). This can cause long hangs when slaves are lost since the file loss is not detected until a subsequent stage attempts to read the shuffle files.

The proposed fix is to also invalidate shuffle files when an executor is lost due to a SlaveLost event.

How was this patch tested?

Unit tests, also verified on an actual cluster that slave loss invalidates shuffle files immediately as expected.

cc @mateiz

@SparkQA
Copy link

SparkQA commented Sep 2, 2016

Test build #64823 has finished for PR 14931 at commit a704376.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: ExecutorLossReason = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it would be clearer to make this an Option similar to failedExecutor rather than a null. The two variables are used together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@vanzin
Copy link
Contributor

vanzin commented Sep 2, 2016

Hmm... as far as I understand, SlaveLost does not mean "remote machine has died", it means "remote executor has died", which does not mean that the shuffle files were lost. Unless you actually try to connect to the shuffle service and verify whether it's alive, you might be throwing away work.

It's possible that "machine has died" might be the most common cause of "SlaveLost", but I'm a little wary of treating both as the same.

@squito has been looking at node blacklisting and might have better ideas here than I do.

@SparkQA
Copy link

SparkQA commented Sep 2, 2016

Test build #64862 has finished for PR 14931 at commit 2430b69.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl
Copy link
Contributor Author

ericl commented Sep 2, 2016

What if we added a flag to SlaveLost indicating if we think the entire host is lost? In many cases that should be true, such as if the event was caused by worker loss or Mesos slave loss.

@vanzin
Copy link
Contributor

vanzin commented Sep 2, 2016

The issue I see is how easy is it for the driver to know that? Adding a new flag to the SlaveLost class doesn't mean that you know how to set its value.

I'm pretty sure, on the YARN side, that we don't know when hosts die, just that a container on that host went away. Maybe Standalone or Mesos would have that info more easily available (e.g. the WorkerWatcher code for Standalone).

@ericl
Copy link
Contributor Author

ericl commented Sep 2, 2016

In standalone mode I was thinking of this case: Master calls removeWorker() -> sends ExecutorUpdated -> StandaloneAppClient -> StandaloneSchedulerBackend, which generates the SlaveLost message. Here you can always determine if the cause was due to worker loss, based on the exit status code provided by the master.

I don't know about Mesos/YARN though, would the shuffle service be running on a separate container from the executor? We can be conservative and not set the flag for these events.

@ericl
Copy link
Contributor Author

ericl commented Sep 3, 2016

Updated to only flag worker loss in standalone mode.

@SparkQA
Copy link

SparkQA commented Sep 3, 2016

Test build #64887 has finished for PR 14931 at commit 93a4852.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SlaveLost(_message: String = \"Slave lost\", workerLost: Boolean = false)

@vanzin
Copy link
Contributor

vanzin commented Sep 6, 2016

This looks ok from what I read of the standalone code, but probably someone more familiar with standalone should take a look. @JoshRosen ?

asfgit pushed a commit that referenced this pull request Sep 6, 2016
…ap tasks

## What changes were proposed in this pull request?

It seems that old shuffle map tasks hanging around after a stage resubmit will delete intended shuffle output files on stop(), causing downstream stages to fail even after successful resubmit completion. This can happen easily if the prior map task is waiting for a network timeout when its stage is resubmitted.

This can cause unnecessary stage resubmits, sometimes multiple times as fetch fails cause a cascade of shuffle file invalidations, and confusing FetchFailure messages that report shuffle index files missing from the local disk.

Given that IndexShuffleBlockResolver commits data atomically, it seems unnecessary to ever delete committed task output: even in the rare case that a task is failed after it finishes committing shuffle output, it should be safe to retain that output.

## How was this patch tested?

Prior to the fix proposed in #14931, I was able to reproduce this behavior by killing slaves in the middle of a large shuffle. After this patch, stages were no longer resubmitted multiple times due to shuffle index loss.

cc JoshRosen vanzin

Author: Eric Liang <[email protected]>

Closes #14932 from ericl/dont-remove-committed-files.
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => SlaveLost(message)
case None => SlaveLost(message, workerLost = true /* worker loss event from master */)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that exitStatus == None implies that a worker was lost, but there are some corner-cases where this isn't necessarily true (e.g. if an executor kill fails). Looking through both the 1.6.x and 2.0.x code, it appears that ExecutorStatus.LOST is used exclusively for denoting whole-worker-loss, so I think that we should check that status here instead of assuming true. Other than that minor corner-case, this patch looks good to me, so I'll merge once we fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with propagating just workerLost explicitly all the way from the master, since ExecutorState is private to deploy.

@SparkQA
Copy link

SparkQA commented Sep 7, 2016

Test build #65016 has finished for PR 14931 at commit a62289e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

LGTM.

There's a slight change of behavior here for the corner-case scenario where the worker (not executor) dies and then is immediately recovered: prior to this patch, I believe that the old shuffle files would continue to be served by the restarted worker's shuffle service, but after this patch the MapOutputTracker entries will have been invalidated and the driver won't ask for shuffle files from that worker.

In terms of default / common-case behaviors, I prefer the behavior implemented in this patch: when a worker disappears it seems reasonable to treat its map outputs as missing and if the worker happens to come back later then it would make more sense to explicitly re-register those outputs. Even if a worker will be eventually recovered it might take a long time for that to happen, leading to long hangs.

If we decide that it's important to re-register map outputs after worker recovery then I think we can add that explicitly in a separate patch.

I'm going to merge this to master and will evaluate backporting to branch-2.0.

@asfgit asfgit closed this in 649fa4b Sep 7, 2016
@JoshRosen
Copy link
Contributor

I'm also going to backport this into branch-2.0 since this is a pretty important robustness fix.

asfgit pushed a commit that referenced this pull request Sep 7, 2016
## What changes were proposed in this pull request?

DAGScheduler invalidates shuffle files when an executor loss event occurs, but not when the external shuffle service is enabled. This is because when shuffle service is on, the shuffle file lifetime can exceed the executor lifetime.

However, it also doesn't invalidate shuffle files when the shuffle service itself is lost (due to whole slave loss). This can cause long hangs when slaves are lost since the file loss is not detected until a subsequent stage attempts to read the shuffle files.

The proposed fix is to also invalidate shuffle files when an executor is lost due to a `SlaveLost` event.

## How was this patch tested?

Unit tests, also verified on an actual cluster that slave loss invalidates shuffle files immediately as expected.

cc mateiz

Author: Eric Liang <[email protected]>

Closes #14931 from ericl/sc-4439.

(cherry picked from commit 649fa4b)
Signed-off-by: Josh Rosen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants