From a3bca65e2010751cae8e6f7b3e8e7be22bdb35d9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 30 Oct 2014 20:42:08 +0800 Subject: [PATCH 1/8] Send the fetch failure message back to Web UI --- .../org/apache/spark/TaskEndReason.scala | 6 +- .../apache/spark/scheduler/DAGScheduler.scala | 4 +- .../apache/spark/scheduler/JobLogger.scala | 2 +- .../spark/shuffle/FetchFailedException.scala | 9 +- .../hash/BlockStoreShuffleFetcher.scala | 12 +-- .../storage/ShuffleBlockFetcherIterator.scala | 82 ++++++++++++------- .../org/apache/spark/util/JsonProtocol.scala | 6 +- .../scala/org/apache/spark/util/Utils.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 10 +-- .../ShuffleBlockFetcherIteratorSuite.scala | 8 +- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 3 +- 12 files changed, 89 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 8f0c5e78416c..72e7b2e080d0 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -69,11 +69,13 @@ case class FetchFailed( bmAddress: BlockManagerId, // Note that bmAddress can be null shuffleId: Int, mapId: Int, - reduceId: Int) + reduceId: Int, + message: String) extends TaskFailedReason { override def toErrorString: String = { val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString - s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)" + s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " + + s"message=\n$message\n)" } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f81fa6d8089f..941e8c4db835 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1050,7 +1050,7 @@ class DAGScheduler( logInfo("Resubmitted " + task + ", so marking it as still running") stage.pendingTasks += task - case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => + case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleToMapStage(shuffleId) @@ -1060,7 +1060,7 @@ class DAGScheduler( if (runningStages.contains(failedStage)) { logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + s"due to a fetch failure from $mapStage (${mapStage.name})") - markStageAsFinished(failedStage, Some("Fetch failure")) + markStageAsFinished(failedStage, Some(failureMessage)) runningStages -= failedStage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 54904bffdf10..4e3d9de54078 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -215,7 +215,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId + " STAGE_ID=" + taskEnd.stageId stageLogInfo(taskEnd.stageId, taskStatus) - case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => + case FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) => taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" + taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + mapId + " REDUCE_ID=" + reduceId diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala index 71c08e9d5a8c..b01bdea350d1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -19,6 +19,7 @@ package org.apache.spark.shuffle import org.apache.spark.storage.BlockManagerId import org.apache.spark.{FetchFailed, TaskEndReason} +import org.apache.spark.util.Utils /** * Failed to fetch a shuffle block. The executor catches this exception and propagates it @@ -30,13 +31,15 @@ private[spark] class FetchFailedException( bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, - reduceId: Int) + reduceId: Int, + message: String) extends Exception { override def getMessage: String = "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId) - def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId) + def toTaskEndReason: TaskEndReason = + FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) } /** @@ -46,7 +49,7 @@ private[spark] class MetadataFetchFailedException( shuffleId: Int, reduceId: Int, message: String) - extends FetchFailedException(null, shuffleId, -1, reduceId) { + extends FetchFailedException(null, shuffleId, -1, reduceId, message) { override def getMessage: String = message } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 6cf9305977a3..4b4cafc67e1e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -19,12 +19,13 @@ package org.apache.spark.shuffle.hash import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap +import scala.util.{Failure, Success, Try} import org.apache.spark._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId} -import org.apache.spark.util.CompletionIterator +import org.apache.spark.util.{Utils, CompletionIterator} private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( @@ -52,18 +53,19 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2))) } - def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = { + def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = { val blockId = blockPair._1 val blockOption = blockPair._2 blockOption match { - case Some(block) => { + case Success(block) => { block.asInstanceOf[Iterator[T]] } - case None => { + case Failure(e) => { blockId match { case ShuffleBlockId(shufId, mapId, _) => val address = statuses(mapId.toInt)._1 - throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId) + throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, + Utils.exceptionString(e)) case _ => throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block") diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 0d6f3bf003a9..6f470a488690 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.util.concurrent.LinkedBlockingQueue import scala.collection.mutable.{ArrayBuffer, HashSet, Queue} +import scala.util.{Failure, Success, Try} import org.apache.spark.{Logging, TaskContext} import org.apache.spark.network.{BlockFetchingListener, BlockTransferService} @@ -54,7 +55,7 @@ final class ShuffleBlockFetcherIterator( blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], serializer: Serializer, maxBytesInFlight: Long) - extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging { + extends Iterator[(BlockId, Try[Iterator[Any]])] with Logging { import ShuffleBlockFetcherIterator._ @@ -117,16 +118,18 @@ final class ShuffleBlockFetcherIterator( private[this] def cleanup() { isZombie = true // Release the current buffer if necessary - if (currentResult != null && !currentResult.failed) { - currentResult.buf.release() + currentResult match { + case SuccessFetchResult(_, _, buf) => buf.release() + case _ => } // Release buffers in the results queue val iter = results.iterator() while (iter.hasNext) { val result = iter.next() - if (!result.failed) { - result.buf.release() + result match { + case SuccessFetchResult(_, _, buf) => buf.release() + case _ => } } } @@ -149,7 +152,7 @@ final class ShuffleBlockFetcherIterator( // Increment the ref count because we need to pass this to a different thread. // This needs to be released after use. buf.retain() - results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), buf)) + results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf)) shuffleMetrics.remoteBytesRead += buf.size shuffleMetrics.remoteBlocksFetched += 1 } @@ -158,7 +161,7 @@ final class ShuffleBlockFetcherIterator( override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) - results.put(new FetchResult(BlockId(blockId), -1, null)) + results.put(new FailureFetchResult(BlockId(blockId), e)) } } ) @@ -229,12 +232,12 @@ final class ShuffleBlockFetcherIterator( val buf = blockManager.getBlockData(blockId) shuffleMetrics.localBlocksFetched += 1 buf.retain() - results.put(new FetchResult(blockId, 0, buf)) + results.put(new SuccessFetchResult(blockId, 0, buf)) } catch { case e: Exception => // If we see an exception, stop immediately. logError(s"Error occurred while fetching local blocks", e) - results.put(new FetchResult(blockId, -1, null)) + results.put(new FailureFetchResult(blockId, e)) return } } @@ -265,15 +268,17 @@ final class ShuffleBlockFetcherIterator( override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch - override def next(): (BlockId, Option[Iterator[Any]]) = { + override def next(): (BlockId, Try[Iterator[Any]]) = { numBlocksProcessed += 1 val startFetchWait = System.currentTimeMillis() currentResult = results.take() val result = currentResult val stopFetchWait = System.currentTimeMillis() shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait) - if (!result.failed) { - bytesInFlight -= result.size + + result match { + case SuccessFetchResult(_, size, _) => bytesInFlight -= size + case _ => } // Send fetch requests up to maxBytesInFlight while (fetchRequests.nonEmpty && @@ -281,20 +286,21 @@ final class ShuffleBlockFetcherIterator( sendRequest(fetchRequests.dequeue()) } - val iteratorOpt: Option[Iterator[Any]] = if (result.failed) { - None - } else { - val is = blockManager.wrapForCompression(result.blockId, result.buf.createInputStream()) - val iter = serializer.newInstance().deserializeStream(is).asIterator - Some(CompletionIterator[Any, Iterator[Any]](iter, { - // Once the iterator is exhausted, release the buffer and set currentResult to null - // so we don't release it again in cleanup. - currentResult = null - result.buf.release() - })) + val iteratorTry: Try[Iterator[Any]] = result match { + case FailureFetchResult(_, e) => Failure(e) + case SuccessFetchResult(blockId, _, buf) => { + val is = blockManager.wrapForCompression(blockId, buf.createInputStream()) + val iter = serializer.newInstance().deserializeStream(is).asIterator + Success(CompletionIterator[Any, Iterator[Any]](iter, { + // Once the iterator is exhausted, release the buffer and set currentResult to null + // so we don't release it again in cleanup. + currentResult = null + buf.release() + })) + } } - (result.blockId, iteratorOpt) + (result.blockId, iteratorTry) } } @@ -313,14 +319,30 @@ object ShuffleBlockFetcherIterator { } /** - * Result of a fetch from a remote block. A failure is represented as size == -1. + * Result of a fetch from a remote block. + */ + trait FetchResult { + val blockId: BlockId + } + + /** + * Result of a fetch from a remote block successfully. * @param blockId block id * @param size estimated size of the block, used to calculate bytesInFlight. - * Note that this is NOT the exact bytes. -1 if failure is present. - * @param buf [[ManagedBuffer]] for the content. null is error. + * Note that this is NOT the exact bytes. + * @param buf [[ManagedBuffer]] for the content. + */ + case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) + extends FetchResult { + require(buf != null) + require(size >= 0) + } + + /** + * Result of a fetch from a remote block unsuccessfully. + * @param blockId block id + * @param e the failure exception */ - case class FetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) { - def failed: Boolean = size == -1 - if (failed) assert(buf == null) else assert(buf != null) + case class FailureFetchResult(blockId: BlockId, e: Throwable) extends FetchResult { } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 5b2e7d3a7edb..7cbb3b56af11 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -279,7 +279,8 @@ private[spark] object JsonProtocol { ("Block Manager Address" -> blockManagerAddress) ~ ("Shuffle ID" -> fetchFailed.shuffleId) ~ ("Map ID" -> fetchFailed.mapId) ~ - ("Reduce ID" -> fetchFailed.reduceId) + ("Reduce ID" -> fetchFailed.reduceId) ~ + ("Message" -> fetchFailed.message) case exceptionFailure: ExceptionFailure => val stackTrace = stackTraceToJson(exceptionFailure.stackTrace) val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing) @@ -627,7 +628,8 @@ private[spark] object JsonProtocol { val shuffleId = (json \ "Shuffle ID").extract[Int] val mapId = (json \ "Map ID").extract[Int] val reduceId = (json \ "Reduce ID").extract[Int] - new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId) + val message = (json \ "Message").extract[String] + new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId, message) case `exceptionFailure` => val className = (json \ "Class Name").extract[String] val description = (json \ "Description").extract[String] diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 063895d3c548..19c87de0355e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1593,7 +1593,7 @@ private[spark] object Utils extends Logging { } /** Return a nice string representation of the exception, including the stack trace. */ - def exceptionString(e: Exception): String = { + def exceptionString(e: Throwable): String = { if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a2e4f712db55..819f95634bcd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -431,7 +431,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F // the 2nd ResultTask failed complete(taskSets(1), Seq( (Success, 42), - (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null))) + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null))) // this will get called // blockManagerMaster.removeExecutor("exec-hostA") // ask the scheduler to try it again @@ -461,7 +461,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( taskSets(1).tasks(0), - FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null, Map[Long, Any](), null, @@ -472,7 +472,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F // The second ResultTask fails, with a fetch failure for the output from the second mapper. runEvent(CompletionEvent( taskSets(1).tasks(0), - FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"), null, Map[Long, Any](), null, @@ -624,7 +624,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F (Success, makeMapStatus("hostC", 1)))) // fail the third stage because hostA went down complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null))) + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null))) // TODO assert this: // blockManagerMaster.removeExecutor("exec-hostA") // have DAGScheduler try again @@ -655,7 +655,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F (Success, makeMapStatus("hostB", 1)))) // pretend stage 0 failed because hostA went down complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null))) + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null))) // TODO assert this: // blockManagerMaster.removeExecutor("exec-hostA") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 4e502cf65e6b..6ffbe0e3e207 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -105,7 +105,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite { for (i <- 0 until 5) { assert(iterator.hasNext, s"iterator should have 5 elements but actually has $i elements") val (blockId, subIterator) = iterator.next() - assert(subIterator.isDefined, + assert(subIterator.isSuccess, s"iterator should have 5 elements defined but actually has $i elements") // Make sure we release the buffer once the iterator is exhausted. @@ -233,8 +233,8 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite { sem.acquire() // The first block should be defined, and the last two are not defined (due to failure) - assert(iterator.next()._2.isDefined === true) - assert(iterator.next()._2.isDefined === false) - assert(iterator.next()._2.isDefined === false) + assert(iterator.next()._2.isSuccess) + assert(iterator.next()._2.isFailure) + assert(iterator.next()._2.isFailure) } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 3370dd4156c3..c902c8b9d6a1 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -115,7 +115,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // Go through all the failure cases to make sure we are counting them as failures. val taskFailedReasons = Seq( Resubmitted, - new FetchFailed(null, 0, 0, 0), + new FetchFailed(null, 0, 0, 0, "ignored"), new ExceptionFailure("Exception", "description", null, None), TaskResultLost, TaskKilled, diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index f1f88c5fd363..ff3025557fdd 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -107,7 +107,7 @@ class JsonProtocolSuite extends FunSuite { testJobResult(jobFailed) // TaskEndReason - val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19) + val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, "Some exception") val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None) testTaskEndReason(Success) testTaskEndReason(Resubmitted) @@ -396,6 +396,7 @@ class JsonProtocolSuite extends FunSuite { assert(r1.mapId === r2.mapId) assert(r1.reduceId === r2.reduceId) assertEquals(r1.bmAddress, r2.bmAddress) + assert(r1.message === r2.message) case (r1: ExceptionFailure, r2: ExceptionFailure) => assert(r1.className === r2.className) assert(r1.description === r2.description) From 0c07d1fa8f472b88b65653b3471b5afba3ee3546 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 1 Nov 2014 12:32:35 +0800 Subject: [PATCH 2/8] Backward-compatible support --- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 7cbb3b56af11..05c2a5c42fea 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -628,8 +628,9 @@ private[spark] object JsonProtocol { val shuffleId = (json \ "Shuffle ID").extract[Int] val mapId = (json \ "Map ID").extract[Int] val reduceId = (json \ "Reduce ID").extract[Int] - val message = (json \ "Message").extract[String] - new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId, message) + val message = Utils.jsonOption(json \ "Message").map(_.extract[String]) + new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId, + message.getOrElse("Unknown")) case `exceptionFailure` => val className = (json \ "Class Name").extract[String] val description = (json \ "Description").extract[String] From 62103fd80a56cb9f2bf072a1b92271be377b5c24 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 1 Nov 2014 23:57:02 +0800 Subject: [PATCH 3/8] Update as per review --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/shuffle/FetchFailedException.scala | 7 +++++-- .../spark/shuffle/hash/BlockStoreShuffleFetcher.scala | 5 ++--- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 7 +++---- .../main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 3 ++- 6 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 941e8c4db835..201b363c1a4e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1060,7 +1060,7 @@ class DAGScheduler( if (runningStages.contains(failedStage)) { logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + s"due to a fetch failure from $mapStage (${mapStage.name})") - markStageAsFinished(failedStage, Some(failureMessage)) + markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage)) runningStages -= failedStage } diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala index b01bdea350d1..d1a8994e059b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -35,11 +35,14 @@ private[spark] class FetchFailedException( message: String) extends Exception { + def this(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, e: Throwable) { + this(bmAddress, shuffleId, mapId, reduceId, Utils.exceptionString(e)) + } + override def getMessage: String = "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId) - def toTaskEndReason: TaskEndReason = - FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) + def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) } /** diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 4b4cafc67e1e..4638b3310ad8 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId} -import org.apache.spark.util.{Utils, CompletionIterator} +import org.apache.spark.util.CompletionIterator private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( @@ -64,8 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { blockId match { case ShuffleBlockId(shufId, mapId, _) => val address = statuses(mapId.toInt)._1 - throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, - Utils.exceptionString(e)) + throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e) case _ => throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block") diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 6f470a488690..faf08526bb18 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -321,7 +321,7 @@ object ShuffleBlockFetcherIterator { /** * Result of a fetch from a remote block. */ - trait FetchResult { + sealed trait FetchResult { val blockId: BlockId } @@ -332,7 +332,7 @@ object ShuffleBlockFetcherIterator { * Note that this is NOT the exact bytes. * @param buf [[ManagedBuffer]] for the content. */ - case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) + sealed case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) extends FetchResult { require(buf != null) require(size >= 0) @@ -343,6 +343,5 @@ object ShuffleBlockFetcherIterator { * @param blockId block id * @param e the failure exception */ - case class FailureFetchResult(blockId: BlockId, e: Throwable) extends FetchResult { - } + sealed case class FailureFetchResult(blockId: BlockId, e: Throwable) extends FetchResult } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 05c2a5c42fea..fae774ccaad7 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -630,7 +630,7 @@ private[spark] object JsonProtocol { val reduceId = (json \ "Reduce ID").extract[Int] val message = Utils.jsonOption(json \ "Message").map(_.extract[String]) new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId, - message.getOrElse("Unknown")) + message.getOrElse("Unknown reason")) case `exceptionFailure` => val className = (json \ "Class Name").extract[String] val description = (json \ "Description").extract[String] diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index ff3025557fdd..0872e19808f0 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -107,7 +107,8 @@ class JsonProtocolSuite extends FunSuite { testJobResult(jobFailed) // TaskEndReason - val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, "Some exception") + val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, + "Some exception") val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None) testTaskEndReason(Success) testTaskEndReason(Resubmitted) From b88c91959218940898fae3cea59ae9f7c6681166 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 2 Nov 2014 13:12:10 +0800 Subject: [PATCH 4/8] Use 'private[storage]' for case classes instead of 'sealed' --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index faf08526bb18..989d1b95777c 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -332,7 +332,7 @@ object ShuffleBlockFetcherIterator { * Note that this is NOT the exact bytes. * @param buf [[ManagedBuffer]] for the content. */ - sealed case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) + private[storage] case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) extends FetchResult { require(buf != null) require(size >= 0) @@ -343,5 +343,6 @@ object ShuffleBlockFetcherIterator { * @param blockId block id * @param e the failure exception */ - sealed case class FailureFetchResult(blockId: BlockId, e: Throwable) extends FetchResult + private[storage] case class FailureFetchResult(blockId: BlockId, e: Throwable) + extends FetchResult } From d51b0b64dcd9a3dad493154d0648a2a1bf42cbd9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 2 Nov 2014 14:52:03 +0800 Subject: [PATCH 5/8] Set e as the cause of FetchFailedException --- .../org/apache/spark/shuffle/FetchFailedException.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala index d1a8994e059b..d6bfe8d00188 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -33,16 +33,18 @@ private[spark] class FetchFailedException( mapId: Int, reduceId: Int, message: String) - extends Exception { + extends Exception(message) { def this(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, e: Throwable) { - this(bmAddress, shuffleId, mapId, reduceId, Utils.exceptionString(e)) + this(bmAddress, shuffleId, mapId, reduceId, e.getMessage) + initCause(e) } override def getMessage: String = "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId) - def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) + def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, + Utils.exceptionString(this)) } /** From 316767dfb237ab9b1019468e5d039d891c2eef12 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 2 Nov 2014 14:58:45 +0800 Subject: [PATCH 6/8] Add private[storage] to FetchResult --- .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 989d1b95777c..c92e36d5a037 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -321,7 +321,7 @@ object ShuffleBlockFetcherIterator { /** * Result of a fetch from a remote block. */ - sealed trait FetchResult { + private[storage] sealed trait FetchResult { val blockId: BlockId } From 4e946f7349db8468375f40fa526c6892f5145759 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 2 Nov 2014 15:51:08 +0800 Subject: [PATCH 7/8] Add e as the cause of SparkException --- .../apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 4638b3310ad8..af98159d6346 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -67,7 +67,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e) case _ => throw new SparkException( - "Failed to get block " + blockId + ", which is not a shuffle block") + "Failed to get block " + blockId + ", which is not a shuffle block", e) } } } From f7e1fafaabce45f59ae0b742c233d70f2aacde3d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 3 Nov 2014 13:14:41 +0800 Subject: [PATCH 8/8] Discard changes for FetchFailedException and minor modification --- .../spark/shuffle/FetchFailedException.scala | 16 ++-------------- .../shuffle/hash/BlockStoreShuffleFetcher.scala | 5 +++-- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala index d6bfe8d00188..0c1b6f4defdb 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -35,16 +35,7 @@ private[spark] class FetchFailedException( message: String) extends Exception(message) { - def this(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, e: Throwable) { - this(bmAddress, shuffleId, mapId, reduceId, e.getMessage) - initCause(e) - } - - override def getMessage: String = - "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId) - - def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, - Utils.exceptionString(this)) + def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) } /** @@ -54,7 +45,4 @@ private[spark] class MetadataFetchFailedException( shuffleId: Int, reduceId: Int, message: String) - extends FetchFailedException(null, shuffleId, -1, reduceId, message) { - - override def getMessage: String = message -} + extends FetchFailedException(null, shuffleId, -1, reduceId, message) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index af98159d6346..cda1b88d10da 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId} -import org.apache.spark.util.CompletionIterator +import org.apache.spark.util.{CompletionIterator, Utils} private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( @@ -64,7 +64,8 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { blockId match { case ShuffleBlockId(shufId, mapId, _) => val address = statuses(mapId.toInt)._1 - throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e) + throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, + Utils.exceptionString(e)) case _ => throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block", e)