Skip to content

Conversation

@liupc
Copy link

@liupc liupc commented Feb 17, 2020

What changes were proposed in this pull request?

As described in SPARK-30849, spark application will sometimes failed due to failed to get mapStatuses broadcast block.

Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, most recent failure: Lost task 18.3 in stage 2.0 (TID 13819, xxxx , executor 8): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_9_piece1 of broadcast_9
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
	at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
	at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
	at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
	at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
	at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)

This is caused by the mapStatuses broadcast id is sent to executor, but was invalidated immediately by the driver before the real fetching of the broadcast.

This PR will try to fix this issue.

Why are the changes needed?

Bugfix

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a valid concern to me.

After fetching a broadcast'ed MapStatus from driver, the MapStatus can be destroyed at the same time when error happens(e.g. a FetchFailed exception from a concurrent task would invalidate that MapStatus). So, at the time we call value on the broadcast'ed MapStatus, it will fail with an uncaught exception from Broadcast(says the block has lost) and fail the job.

We should catch this exception and throw FetchFailed instead, so that the stage can re-run.

But I'm also surprised and doubt after reading JIRA ticket and error log that how does a same task hit this issue continuously 4 times? Is it only a coincidence or I miss something? @liupc

also cc @cloud-fan @jiangxb1987

s"partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId, conf)
try {
val statuses = getStatuses(shuffleId, conf)

This comment was marked as spam.

@jiangxb1987
Copy link
Contributor

Could you try to reproduce this issue on master branch first?

@liupc
Copy link
Author

liupc commented Feb 24, 2020

But I'm also surprised and doubt after reading JIRA ticket and error log that how does a same task hit this issue continuously 4 times? Is it only a coincidence or I miss something?

currently, when registerMapOutput we will invalidate the cached mapStatus broadcast, that means this mapStatus broadcast will be destroyed when the retrying tasks of the parent stage finished. So if there are serval tasks retried for parent stage, then there are large possibility that we will encounter this issue.

@liupc
Copy link
Author

liupc commented Feb 24, 2020

Could you try to reproduce this issue on master branch first?

Yes, I found this case in earlier spark version, but I checked the newest codes, seems not fixed. I will try to reproduce this issue in master branch.

@Ngone51
Copy link
Member

Ngone51 commented Feb 24, 2020

So if there are serval tasks retried for parent stage, then there are large possibility that we will encounter this issue.

But the max task failure is for the same task rather than several tasks in a same stage?

@liupc
Copy link
Author

liupc commented Mar 5, 2020

But the max task failure is for the same task rather than several tasks in a same stage?

Yes,I mean the fetch failure from the executor may cause several mapStatus to be removed and recomputed, thus several tasks will be re-executed in parent stage, so each of those task finish the cached mapStatuses will be invalidated, so the task of child stage 2 described in the jira may repeatedly encounter the IOException. @Ngone51 , I will try to write some tests or do some hack to reproduce this issue.

@Ngone51
Copy link
Member

Ngone51 commented Mar 5, 2020

Ok, I get your point now. Let me paraphrase it to see if I understand correctly:

Assuming we have stage0 finished while stage1 and stage2 are running concurrently and both depend on stage0.

Task from stage1 hit FetchFailedException and causes stage0 to re-run. At the same time, task X in stage2 is still running. Since there's multiple tasks from stage0 are running at the same time and each time a task from stage0 finished will invalidate cached map status(destroy broadcast), thus, task X has high possibility to hit IOException(a.k.a Failed to get broadcast) after fetching broadcasted map status from driver(because tasks from stage0 are continuously destroying the broadcast at the same time).

Also, in TaskSetManager side, it treats the exception as a counted task failure(rather than FetchFailed) and retry the task and then hit the same exception again and again.

@cloud-fan
Copy link
Contributor

is it possible to add a UT for it?

@liupc
Copy link
Author

liupc commented Mar 9, 2020

Ok, I get your point now. Let me paraphrase it to see if I understand correctly:

Assuming we have stage0 finished while stage1 and stage2 are running concurrently and both depend on stage0.

Task from stage1 hit FetchFailedException and causes stage0 to re-run. At the same time, task X in stage2 is still running. Since there's multiple tasks from stage0 are running at the same time and each time a task from stage0 finished will invalidate cached map status(destroy broadcast), thus, task X has high possibility to hit IOException(a.k.a Failed to get broadcast) after fetching broadcasted map status from driver(because tasks from stage0 are continuously destroying the broadcast at the same time).

Also, in TaskSetManager side, it treats the exception as a counted task failure(rather than FetchFailed) and retry the task and then hit the same exception again and again.

That's it! Thanks for reviewing @Ngone51

@cloud-fan
Copy link
Contributor

ok to test

} catch {
case e: IOException if
Throwables.getCausalChain(e).asScala.exists(_.isInstanceOf[BlockNotFoundException]) =>
mapStatuses.clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK to clear out all the map status? Shouldn't we only drop the data of the current shuffle id?

Copy link
Author

@liupc liupc Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Good question! yes, it's ok to clear all the map status, but I think maybe just drop the data of the current shuffle id is enough. But it seems that we currently bind an global epoch to the MapOutputTracker, if one stage FetchFailed, then the epoch will be updated, so that it will clear all the map statuses cache in the executor side.
Should we change this behavior? if so may be we can put another PR for that.

Copy link
Member

@Ngone51 Ngone51 Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a potential assuming that shuffle data are aways randomly and evenly placed on nodes. That means, any shuffle fetch failure can imply the potential fetch failure for other shuffles in future. So, currently, we aways clear mapStatuses when fetch failure happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But here is the broadcast being invalid issue. I don't think it usually happens for a lot of shuffles at the same time.

@SparkQA
Copy link

SparkQA commented Mar 9, 2020

Test build #119565 has finished for PR 27604 at commit 0a51c15.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

We need to write a UT for this case.

fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
} catch {
case e: IOException if
Throwables.getCausalChain(e).asScala.exists(_.isInstanceOf[BlockNotFoundException]) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we logError here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe print logs at the Executor exception handling? I checked the code, seems it will not print any logs for FetchFailedException now?

case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe print logs at the Executor exception handling?

I don't understand what do you mean by this.

I checked the code, seems it will not print any logs for FetchFailedException now?

Yeah, but I think this's one is different. For me, I'd like to have a way to distinguish these two fetch failure.

Copy link
Author

@liupc liupc Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MetadataFetchFailedException already contains the root cause message, I think we can just print logs when handling fetch failure exception in the Executor class? What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MetadataFetchFailedException already contains the root cause message

Oh, I miss that..then, it should be fine.

@liupc
Copy link
Author

liupc commented Mar 10, 2020

We need to write a UT for this case.

I will add a UT later. @jiangxb1987

@SparkQA
Copy link

SparkQA commented Mar 10, 2020

Test build #119603 has finished for PR 27604 at commit c8826c1.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

Should we also update

val statuses = getStatuses(shuffleId, conf)
?

@liupc
Copy link
Author

liupc commented Mar 20, 2020

Should we also update

val statuses = getStatuses(shuffleId, conf)

?

What do you mean?

@jiangxb1987
Copy link
Contributor

The getStatus method could throw a MetadataFetchFailedException now, you should move every method call of it into a try...catch block.

@liupc
Copy link
Author

liupc commented May 14, 2020

The getStatus method could throw a MetadataFetchFailedException now, you should move every method call of it into a try...catch block.

Done, thanks for review @jiangxb1987

@SparkQA
Copy link

SparkQA commented May 14, 2020

Test build #122608 has finished for PR 27604 at commit e9c46ca.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 15, 2020

Test build #124026 has finished for PR 27604 at commit 2e11d1b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants