Skip to content

Conversation

@cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Jun 28, 2018

What changes were proposed in this pull request?

When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.

How was this patch tested?

TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled

@maropu
Copy link
Member

maropu commented Jun 28, 2018

Can you add a test to check if no exception thrown in that condition with this patch?

@cxzl25
Copy link
Contributor Author

cxzl25 commented Jul 2, 2018

@maropu
I have added a unit test.
Can you trigger a test for this?

@cxzl25
Copy link
Contributor Author

cxzl25 commented Jul 4, 2018

@maropu @cloud-fan @squito
Can you trigger a test for this?
This is the exception stack in the log:

ERROR Utils: uncaught error in thread task-scheduler-speculation, stopping SparkContext
java.util.NoSuchElementException: MedianHeap is empty.
at org.apache.spark.util.collection.MedianHeap.median(MedianHeap.scala:83)
at org.apache.spark.scheduler.TaskSetManager.checkSpeculatableTasks(TaskSetManager.scala:968)
at org.apache.spark.scheduler.Pool$$anonfun$checkSpeculatableTasks$1.apply(Pool.scala:94)
at org.apache.spark.scheduler.Pool$$anonfun$checkSpeculatableTasks$1.apply(Pool.scala:93)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.scheduler.Pool.checkSpeculatableTasks(Pool.scala:93)
at org.apache.spark.scheduler.Pool$$anonfun$checkSpeculatableTasks$1.apply(Pool.scala:94)
at org.apache.spark.scheduler.Pool$$anonfun$checkSpeculatableTasks$1.apply(Pool.scala:93)

@squito
Copy link
Contributor

squito commented Jul 5, 2018

ok to test

if (speculationEnabled) {
taskAttempts(index).headOption.map { info =>
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
successfulTaskDurations.insert(info.duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the normal code path to update task durations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskSetManager#handleSuccessfulTask update successful task durations, and write to successfulTaskDurations.

When there are multiple tasksets for this stage, markPartitionCompletedInAllTaskSets is
accumulate the value of tasksSuccessful.

In this case, when checkSpeculatableTasks is called, the value of tasksSuccessful matches the condition, but successfulTaskDurations is empty.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L723

  def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
    val info = taskInfos(tid)
    val index = info.index
    info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
    if (speculationEnabled) {
      successfulTaskDurations.insert(info.duration)
    }
   // ...
   // There may be multiple tasksets for this stage -- we let all of them know that the partition
   // was completed.  This may result in some of the tasksets getting completed.
    sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId)

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L987

override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
//...
  if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
      val time = clock.getTimeMillis()
      val medianDuration = successfulTaskDurations.median

@cloud-fan
Copy link
Contributor

cc @jiangxb1987

@SparkQA
Copy link

SparkQA commented Jul 5, 2018

Test build #92631 has finished for PR 21656 at commit 55ddbeb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jul 5, 2018

Thanks for finding this and suggesting a fix @cxzl25. But, I'm not sure it makes sense to use this duration. its not how long the task actually took to complete. I think it might make more sense to just ignore this task for speculation. I will think about it some more.

cc @markhamstra @tgravescs

private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
if (speculationEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC in this case no task in this taskSet actually successfully finishes, it's another task attempt from another taskSet for the same stage that succeeded. In stead of changing this code path, I'd suggest we have another flag to show whether any task succeeded in current taskSet, and if no task have succeeded, skip L987.

WDYT @squito ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that is sort of what I was suggesting -- but I was thinking rather than just a flag, maybe we separate out tasksSuccessful into tasksCompletedSuccessfully (from this taskset) and tasksNoLongerNecessary (from any taskset), perhaps with better names. If you just had a flag, you would avoid the exception from the empty heap, but you still might decide to enable speculation prematurely as you really haven't finished enough for SPECULATION_QUANTILE:

if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {

@tgravescs
Copy link
Contributor

I assume this is really that it isn't updating successfulTaskDurations? MedianHeap is a collection, can you please update description and title to be more explicit

@tgravescs
Copy link
Contributor

In this case one of the older stage attempts (that is a zombie) marked the task as successful but then the newest stage attempt checked to see if it needed to speculate. Is that correct?

Ideally I think for speculation we want to look at the task time for all stage attempts. But that is probably a bigger change then this. If we aren't doing that then I think ignoring it for speculation is ok. Otherwise how hard is it to send the actual task info into here so it could use the real time the successful task took?

@squito
Copy link
Contributor

squito commented Jul 6, 2018

Ideally I think for speculation we want to look at the task time for all stage attempts. But that is probably a bigger change then this

yeah I agree, on both points. One thing which is a little tricky is that you probably want to make sure you're only counting times from different partitions -- you might times from the same partition from multiple attempts, but that shouldn't count. (or maybe we don't really care that much as its just a heuristic anyway ...)

private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
if (speculationEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

speculationEnabled && ! isZombie

@cxzl25 cxzl25 changed the title [SPARK-24677][Core]MedianHeap is empty when speculation is enabled, causing the SparkContext to stop [SPARK-24677][Core]Avoid NoSuchElementException from MedianHeap Jul 10, 2018
@tgravescs
Copy link
Contributor

Ok what did we decide on the time then? I would say for now either ignore or send down the real time.

@cxzl25 How hard is it to send the actual task info into here so it could use the real time the successful task took? At a glance it doesn't look to hard to add in the additional information into the function calls to pass it into markPartitionCompleted

@cxzl25
Copy link
Contributor Author

cxzl25 commented Jul 10, 2018

@tgravescs
This is really not difficult.
I'm just not sure if we want to ignore or send down the real time.
Now I have submitted a change, use actual time of successful task.

@jiangxb1987
Copy link
Contributor

The changes LGTM

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92817 has finished for PR 21656 at commit d8fdceb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92820 has finished for PR 21656 at commit 1c1df5c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

@squito are you ok with this approach?

@tgravescs
Copy link
Contributor

+1

@squito
Copy link
Contributor

squito commented Jul 17, 2018

lgtm

asfgit pushed a commit that referenced this pull request Jul 18, 2018
## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled

Author: sychen <[email protected]>

Closes #21656 from cxzl25/fix_MedianHeap_empty.

(cherry picked from commit c8bee93)
Signed-off-by: Thomas Graves <[email protected]>
@asfgit asfgit closed this in c8bee93 Jul 18, 2018
asfgit pushed a commit that referenced this pull request Jul 18, 2018
## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled

Author: sychen <[email protected]>

Closes #21656 from cxzl25/fix_MedianHeap_empty.

(cherry picked from commit c8bee93)
Signed-off-by: Thomas Graves <[email protected]>
@tgravescs
Copy link
Contributor

merged thanks @cxzl25

MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled

Author: sychen <[email protected]>

Closes apache#21656 from cxzl25/fix_MedianHeap_empty.

(cherry picked from commit c8bee93)
Signed-off-by: Thomas Graves <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 26, 2019
## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled

Author: sychen <[email protected]>

Closes apache#21656 from cxzl25/fix_MedianHeap_empty.

(cherry picked from commit c8bee93)
Signed-off-by: Thomas Graves <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 27, 2019
## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled

Author: sychen <[email protected]>

Closes apache#21656 from cxzl25/fix_MedianHeap_empty.

(cherry picked from commit c8bee93)
Signed-off-by: Thomas Graves <[email protected]>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
Ref: LIHADOOP-52383

When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled

Author: sychen <[email protected]>

Closes apache#21656 from cxzl25/fix_MedianHeap_empty.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants