Skip to content

Commit bf578de

Browse files
committed
Removed throwable field from FetchFailedException and added MetadataFetchFailedException
FetchFailedException used to have a Throwable field, but in reality we never propagate any of the throwable/exceptions back to the driver because Executor explicitly looks for FetchFailedException and then sends FetchFailed as the TaskEndReason. This pull request removes the throwable and adds a MetadataFetchFailedException that extends FetchFailedException (so now MapOutputTracker throws MetadataFetchFailedException instead). Author: Reynold Xin <[email protected]> Closes #1227 from rxin/metadataFetchException and squashes the following commits: 5cb1e0a [Reynold Xin] MetadataFetchFailedException extends FetchFailedException. 8861ee2 [Reynold Xin] Throw MetadataFetchFailedException in MapOutputTracker.
1 parent 981bde9 commit bf578de

File tree

7 files changed

+42
-27
lines changed

7 files changed

+42
-27
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ import scala.concurrent.Await
2525

2626
import akka.actor._
2727
import akka.pattern.ask
28+
29+
import org.apache.spark.util._
2830
import org.apache.spark.scheduler.MapStatus
31+
import org.apache.spark.shuffle.MetadataFetchFailedException
2932
import org.apache.spark.storage.BlockManagerId
30-
import org.apache.spark.util._
3133

3234
private[spark] sealed trait MapOutputTrackerMessage
3335
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
@@ -168,8 +170,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
168170
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
169171
}
170172
} else {
171-
throw new FetchFailedException(null, shuffleId, -1, reduceId,
172-
new Exception("Missing all output locations for shuffle " + shuffleId))
173+
throw new MetadataFetchFailedException(
174+
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
173175
}
174176
} else {
175177
statuses.synchronized {
@@ -371,8 +373,8 @@ private[spark] object MapOutputTracker {
371373
statuses.map {
372374
status =>
373375
if (status == null) {
374-
throw new FetchFailedException(null, shuffleId, -1, reduceId,
375-
new Exception("Missing an output location for shuffle " + shuffleId))
376+
throw new MetadataFetchFailedException(
377+
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
376378
} else {
377379
(status.location, decompressSize(status.compressedSizes(reduceId)))
378380
}

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ case object Resubmitted extends TaskFailedReason {
6565
*/
6666
@DeveloperApi
6767
case class FetchFailed(
68-
bmAddress: BlockManagerId,
68+
bmAddress: BlockManagerId, // Note that bmAddress can be null
6969
shuffleId: Int,
7070
mapId: Int,
7171
reduceId: Int)

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import scala.collection.JavaConversions._
2626
import scala.collection.mutable.HashMap
2727

2828
import org.apache.spark._
29-
import org.apache.spark.deploy.SparkHadoopUtil
3029
import org.apache.spark.scheduler._
30+
import org.apache.spark.shuffle.FetchFailedException
3131
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
3232
import org.apache.spark.util.{AkkaUtils, Utils}
3333

core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ import java.nio.ByteBuffer
2121

2222
import org.apache.spark.util.SerializableBuffer
2323

24+
/**
25+
* Description of a task that gets passed onto executors to be executed, usually created by
26+
* [[TaskSetManager.resourceOffer]].
27+
*/
2428
private[spark] class TaskDescription(
2529
val taskId: Long,
2630
val executorId: String,

core/src/main/scala/org/apache/spark/FetchFailedException.scala renamed to core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,38 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark
18+
package org.apache.spark.shuffle
1919

2020
import org.apache.spark.storage.BlockManagerId
21+
import org.apache.spark.{FetchFailed, TaskEndReason}
2122

23+
/**
24+
* Failed to fetch a shuffle block. The executor catches this exception and propagates it
25+
* back to DAGScheduler (through TaskEndReason) so we'd resubmit the previous stage.
26+
*
27+
* Note that bmAddress can be null.
28+
*/
2229
private[spark] class FetchFailedException(
23-
taskEndReason: TaskEndReason,
24-
message: String,
25-
cause: Throwable)
30+
bmAddress: BlockManagerId,
31+
shuffleId: Int,
32+
mapId: Int,
33+
reduceId: Int)
2634
extends Exception {
2735

28-
def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
29-
cause: Throwable) =
30-
this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
31-
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
32-
cause)
33-
34-
def this (shuffleId: Int, reduceId: Int, cause: Throwable) =
35-
this(FetchFailed(null, shuffleId, -1, reduceId),
36-
"Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause)
37-
38-
override def getMessage(): String = message
36+
override def getMessage: String =
37+
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
3938

39+
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
40+
}
4041

41-
override def getCause(): Throwable = cause
42-
43-
def toTaskEndReason: TaskEndReason = taskEndReason
42+
/**
43+
* Failed to get shuffle metadata from [[org.apache.spark.MapOutputTracker]].
44+
*/
45+
private[spark] class MetadataFetchFailedException(
46+
shuffleId: Int,
47+
reduceId: Int,
48+
message: String)
49+
extends FetchFailedException(null, shuffleId, -1, reduceId) {
4450

51+
override def getMessage: String = message
4552
}

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ package org.apache.spark.shuffle.hash
2020
import scala.collection.mutable.ArrayBuffer
2121
import scala.collection.mutable.HashMap
2222

23+
import org.apache.spark._
2324
import org.apache.spark.executor.ShuffleReadMetrics
2425
import org.apache.spark.serializer.Serializer
26+
import org.apache.spark.shuffle.FetchFailedException
2527
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
2628
import org.apache.spark.util.CompletionIterator
27-
import org.apache.spark._
2829

2930
private[hash] object BlockStoreShuffleFetcher extends Logging {
3031
def fetch[T](
@@ -63,7 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
6364
blockId match {
6465
case ShuffleBlockId(shufId, mapId, _) =>
6566
val address = statuses(mapId.toInt)._1
66-
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
67+
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
6768
case _ =>
6869
throw new SparkException(
6970
"Failed to get block " + blockId + ", which is not a shuffle block")

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import akka.testkit.TestActorRef
2424
import org.scalatest.FunSuite
2525

2626
import org.apache.spark.scheduler.MapStatus
27+
import org.apache.spark.shuffle.FetchFailedException
2728
import org.apache.spark.storage.BlockManagerId
2829
import org.apache.spark.util.AkkaUtils
2930

0 commit comments

Comments
 (0)