From d08c20cd1fbb22bb5db191db3d4616e5ed8b6f52 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 6 May 2015 19:49:27 -0500 Subject: [PATCH 01/18] tasks know which stageAttempt they belong to --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- .../main/scala/org/apache/spark/scheduler/ResultTask.scala | 3 ++- .../scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 5 +++-- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 6 +++++- core/src/test/java/org/apache/spark/JavaAPISuite.java | 2 +- .../test/scala/org/apache/spark/scheduler/FakeTask.scala | 2 +- .../apache/spark/scheduler/NotSerializableFakeTask.scala | 2 +- .../scala/org/apache/spark/scheduler/TaskContextSuite.scala | 4 ++-- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 9 files changed, 18 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5d812918a13d1..5388a5922a374 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -891,7 +891,7 @@ class DAGScheduler( partitionsToCompute.map { id => val locs = getPreferredLocs(stage.rdd, id) val part = stage.rdd.partitions(id) - new ShuffleMapTask(stage.id, taskBinary, part, locs) + new ShuffleMapTask(stage.id, stage.attemptId, taskBinary, part, locs) } case stage: ResultStage => @@ -900,7 +900,7 @@ class DAGScheduler( val p: Int = job.partitions(id) val part = stage.rdd.partitions(p) val locs = getPreferredLocs(stage.rdd, p) - new ResultTask(stage.id, taskBinary, part, locs, id) + new ResultTask(stage.id, stage.attemptId, taskBinary, part, locs, id) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index c9a124113961f..9c2606e278c54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -41,11 +41,12 @@ import org.apache.spark.rdd.RDD */ private[spark] class ResultTask[T, U]( stageId: Int, + stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient locs: Seq[TaskLocation], val outputId: Int) - extends Task[U](stageId, partition.index) with Serializable { + extends Task[U](stageId, stageAttemptId, partition.index) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { if (locs == null) Nil else locs.toSet.toSeq diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index bd3dd23dfe1ac..14c8c00961487 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -40,14 +40,15 @@ import org.apache.spark.shuffle.ShuffleWriter */ private[spark] class ShuffleMapTask( stageId: Int, + stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient private var locs: Seq[TaskLocation]) - extends Task[MapStatus](stageId, partition.index) with Logging { + extends Task[MapStatus](stageId, stageAttemptId, partition.index) with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ def this(partitionId: Int) { - this(0, null, new Partition { override def index: Int = 0 }, null) + this(0, 0, null, new Partition { override def index: Int = 0 }, null) } @transient private val preferredLocs: Seq[TaskLocation] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 586d1e06204c1..b8bd0a5c10370 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -43,7 +43,10 @@ import org.apache.spark.util.Utils * @param stageId id of the stage this task belongs to * @param partitionId index of the number in the RDD */ -private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { +private[spark] abstract class Task[T]( + val stageId: Int, + val stageAttemptId: Int, + var partitionId: Int) extends Serializable { /** * Called by [[Executor]] to run this task. @@ -55,6 +58,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex final def run(taskAttemptId: Long, attemptNumber: Int): T = { context = new TaskContextImpl( stageId = stageId, + stageAttemptId = stageAttemptId, partitionId = partitionId, taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index c2089b0e56a1f..8718f276a9245 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1009,7 +1009,7 @@ public void persist() { @Test public void iterator() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, false, new TaskMetrics()); + TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, false, 0, new TaskMetrics()); Assert.assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue()); } diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index 0a7cb69416a08..188dded7c02f7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import org.apache.spark.TaskContext -class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) { +class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, 0) { override def runTask(context: TaskContext): Int = 0 override def preferredLocations: Seq[TaskLocation] = prefLocs diff --git a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala index 9b92f8de56759..383855caefa2f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala @@ -25,7 +25,7 @@ import org.apache.spark.TaskContext * A Task implementation that fails to serialize. */ private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int) - extends Task[Array[Byte]](stageId, 0) { + extends Task[Array[Byte]](stageId, 0, 0) { override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte] override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]() diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 83ae8701243e5..34602a95e53ec 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -42,8 +42,8 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte } val closureSerializer = SparkEnv.get.closureSerializer.newInstance() val func = (c: TaskContext, i: Iterator[String]) => i.next() - val task = new ResultTask[String, String]( - 0, sc.broadcast(closureSerializer.serialize((rdd, func)).array), rdd.partitions(0), Seq(), 0) + val task = new ResultTask[String, String](0, 0, + sc.broadcast(closureSerializer.serialize((rdd, func)).array), rdd.partitions(0), Seq(), 0) intercept[RuntimeException] { task.run(0, 0) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 6198cea46ddf8..65688124607a7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -137,7 +137,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex /** * A Task implementation that results in a large serialized task. */ -class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) { +class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) { val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) val random = new Random(0) random.nextBytes(randomBuffer) From 89e8428db2441258597e3962905da6317912cc12 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 6 May 2015 22:54:57 -0500 Subject: [PATCH 02/18] reproduce the failure --- .../org/apache/spark/TaskContextImpl.scala | 1 + .../spark/scheduler/DAGSchedulerSuite.scala | 59 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index b4d572cb52313..077fa038bbe69 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -30,6 +30,7 @@ private[spark] class TaskContextImpl( override val attemptNumber: Int, override val taskMemoryManager: TaskMemoryManager, val runningLocally: Boolean = false, + val stageAttemptId: Int = 0, // for testing val taskMetrics: TaskMetrics = TaskMetrics.empty) extends TaskContext with Logging { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6a8ae29aae675..433a5189b22d2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.util.CallSite import org.apache.spark.executor.TaskMetrics @@ -773,6 +774,64 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + test("no concurrent retries for stage attempts (SPARK-7308)") { + // see SPARK-7308 for a detailed description of the conditions this is trying to recreate. + // note that this is somewhat convoluted for a test case, but isn't actually very unusual + // under a real workload. Note that we only fail the first attempt of stage 2, but that + // could be enough to cause havoc. + + val conf = new SparkConf().set("spark.executor.memory", "100m") + val clusterSc = new SparkContext("local-cluster[10,4,100]", "test-cluster", conf) + val bms = ArrayBuffer[BlockManagerId]() + val stageFailureCount = HashMap[Int, Int]() + clusterSc.addSparkListener(new SparkListener { + override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { + bms += blockManagerAdded.blockManagerId + } + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + if (stageCompleted.stageInfo.failureReason.isDefined) { + val stage = stageCompleted.stageInfo.stageId + stageFailureCount(stage) = stageFailureCount.getOrElse(stage, 0) + 1 + } + } + }) + try { + val rawData = clusterSc.parallelize(1 to 1e6.toInt, 500).map{x => (x % 100) -> x}.cache() + rawData.count() + val aBm = bms(0) + val shuffled = rawData.groupByKey(100).mapPartitionsWithIndex{ case (idx, itr) => + // we want one failure quickly, and more failures after stage 0 has finished its + // second attempt + if (TaskContext.get().asInstanceOf[TaskContextImpl].stageAttemptId == 0) { + if (idx == 0) { + throw new FetchFailedException(aBm, 0, 0, idx, cause = new RuntimeException("simulated fetch failure")) + } else if (idx > 0 && math.random < 0.1) { + Thread.sleep(10000) + throw new FetchFailedException(aBm, 0, 0, idx, cause = new RuntimeException("simulated fetch failure")) + } + } else { + Thread.sleep(10000) + } + Thread.sleep(500) // want to make sure plenty of these finish after task 0 fails + itr.map{x => ((x._1 + 5) % 100) -> x._2 } + } + val shuffledAgain = shuffled.flatMap{ case(k,vs) => vs.map{k -> _}}.groupByKey(100) + val data = shuffledAgain.mapPartitions { itr => + Thread.sleep(10000) + itr.flatMap{_._2} + }.cache().collect() + val count = data.size + assert(count === 1e6.toInt) + assert(data.toSet === (1 to 1e6.toInt).toSet) + // we should only get one failure from stage 2, everything else should be fine + assert(stageFailureCount(2) === 1) + assert(stageFailureCount.getOrElse(1, 0) === 0) + assert(stageFailureCount.getOrElse(3, 0) === 0) + } finally { + clusterSc.stop() + } + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. From 70a787be6e55605365d84490e0d2072d4c7f5143 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 6 May 2015 23:13:23 -0500 Subject: [PATCH 03/18] ignore fetch failure from attempts that are already failed. only a partial fix, still have some concurrent attempts --- .../org/apache/spark/scheduler/DAGScheduler.scala | 11 ++++++++--- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5388a5922a374..d02a4bf35bb3b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1103,9 +1103,14 @@ class DAGScheduler( // multiple tasks running concurrently on different executors). In that case, it is possible // the fetch failure has already been handled by the scheduler. if (runningStages.contains(failedStage)) { - logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + - s"due to a fetch failure from $mapStage (${mapStage.name})") - markStageAsFinished(failedStage, Some(failureMessage)) + if (failedStage.attemptId - 1 > task.stageAttemptId) { + logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + + s" ${task.stageAttemptId}, which has already failed") + } else { + logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + + s"due to a fetch failure from $mapStage (${mapStage.name})") + markStageAsFinished(failedStage, Some(failureMessage)) + } } if (disallowStageRetryForTest) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 433a5189b22d2..7d11c8e316d5b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -826,7 +826,7 @@ class DAGSchedulerSuite // we should only get one failure from stage 2, everything else should be fine assert(stageFailureCount(2) === 1) assert(stageFailureCount.getOrElse(1, 0) === 0) - assert(stageFailureCount.getOrElse(3, 0) === 0) + assert(stageFailureCount.getOrElse(3, 0) <= 2) // TODO this should be 0, bug still exists } finally { clusterSc.stop() } From 7fbcefbdb466daca0f492966492e4d7710247810 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 6 May 2015 23:15:11 -0500 Subject: [PATCH 04/18] ignore the test for now just to avoid swamping jenkins --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 7d11c8e316d5b..3f7d5b6d90d3a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -774,7 +774,7 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } - test("no concurrent retries for stage attempts (SPARK-7308)") { + ignore("no concurrent retries for stage attempts (SPARK-7308)") { // see SPARK-7308 for a detailed description of the conditions this is trying to recreate. // note that this is somewhat convoluted for a test case, but isn't actually very unusual // under a real workload. Note that we only fail the first attempt of stage 2, but that From 2eebbf214bb534c5bd7ebaa9de2be7b3471a497b Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 6 May 2015 23:50:44 -0500 Subject: [PATCH 05/18] style --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3f7d5b6d90d3a..68ec1ed0f24a6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -804,10 +804,12 @@ class DAGSchedulerSuite // second attempt if (TaskContext.get().asInstanceOf[TaskContextImpl].stageAttemptId == 0) { if (idx == 0) { - throw new FetchFailedException(aBm, 0, 0, idx, cause = new RuntimeException("simulated fetch failure")) + throw new FetchFailedException(aBm, 0, 0, idx, + cause = new RuntimeException("simulated fetch failure")) } else if (idx > 0 && math.random < 0.1) { Thread.sleep(10000) - throw new FetchFailedException(aBm, 0, 0, idx, cause = new RuntimeException("simulated fetch failure")) + throw new FetchFailedException(aBm, 0, 0, idx, + cause = new RuntimeException("simulated fetch failure")) } } else { Thread.sleep(10000) From 7142242547074604c1a7ef9dd701473dd40d4693 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 May 2015 10:52:30 -0500 Subject: [PATCH 06/18] more rigorous test case --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 68ec1ed0f24a6..b5bee5d9760a3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -814,7 +814,9 @@ class DAGSchedulerSuite } else { Thread.sleep(10000) } - Thread.sleep(500) // want to make sure plenty of these finish after task 0 fails + // want to make sure plenty of these finish after task 0 fails, and some even finish + // after the previous stage is retried and this stage retry is started + Thread.sleep((500 + math.random * 5000).toLong) itr.map{x => ((x._1 + 5) % 100) -> x._2 } } val shuffledAgain = shuffled.flatMap{ case(k,vs) => vs.map{k -> _}}.groupByKey(100) @@ -828,7 +830,7 @@ class DAGSchedulerSuite // we should only get one failure from stage 2, everything else should be fine assert(stageFailureCount(2) === 1) assert(stageFailureCount.getOrElse(1, 0) === 0) - assert(stageFailureCount.getOrElse(3, 0) <= 2) // TODO this should be 0, bug still exists + assert(stageFailureCount.getOrElse(3, 0) == 0) } finally { clusterSc.stop() } From ccaa159a0d1a449acf271daa028a53228a879a4c Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 May 2015 10:53:09 -0500 Subject: [PATCH 07/18] index file needs to handle cases when data file already exist, and the actual data is in the middle of it --- .../apache/spark/shuffle/IndexShuffleBlockManager.scala | 8 ++++++-- .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala index a1741e2875c16..6ec29296ebca3 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala @@ -76,12 +76,16 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver { * end of the output file. This will be used by getBlockLocation to figure out where each block * begins and ends. * */ - def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = { + def writeIndexFile( + shuffleId: Int, + mapId: Int, + lengths: Array[Long], + initialFileLength: Long): Unit = { val indexFile = getIndexFile(shuffleId, mapId) val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. - var offset = 0L + var offset = initialFileLength out.writeLong(offset) for (length <- lengths) { offset += length diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index a066435df6fb0..bb4db87b79cff 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -66,9 +66,10 @@ private[spark] class SortShuffleWriter[K, V, C]( // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) + val initialFileLength = outputFile.length() val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) - shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) + shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths, initialFileLength) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } From 3585b968f40817feb15c7056e70a4f83a6891012 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 May 2015 11:14:08 -0500 Subject: [PATCH 08/18] pare down the unit test --- .../spark/scheduler/DAGSchedulerSuite.scala | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b5bee5d9760a3..c3d009f186eed 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -781,7 +781,7 @@ class DAGSchedulerSuite // could be enough to cause havoc. val conf = new SparkConf().set("spark.executor.memory", "100m") - val clusterSc = new SparkContext("local-cluster[10,4,100]", "test-cluster", conf) + val clusterSc = new SparkContext("local-cluster[5,4,100]", "test-cluster", conf) val bms = ArrayBuffer[BlockManagerId]() val stageFailureCount = HashMap[Int, Int]() clusterSc.addSparkListener(new SparkListener { @@ -792,11 +792,12 @@ class DAGSchedulerSuite if (stageCompleted.stageInfo.failureReason.isDefined) { val stage = stageCompleted.stageInfo.stageId stageFailureCount(stage) = stageFailureCount.getOrElse(stage, 0) + 1 + println("stage " + stage + " failed: " + stageFailureCount(stage)) } } }) try { - val rawData = clusterSc.parallelize(1 to 1e6.toInt, 500).map{x => (x % 100) -> x}.cache() + val rawData = clusterSc.parallelize(1 to 1e6.toInt, 20).map{x => (x % 100) -> x}.cache() rawData.count() val aBm = bms(0) val shuffled = rawData.groupByKey(100).mapPartitionsWithIndex{ case (idx, itr) => @@ -810,20 +811,16 @@ class DAGSchedulerSuite Thread.sleep(10000) throw new FetchFailedException(aBm, 0, 0, idx, cause = new RuntimeException("simulated fetch failure")) + } else { + // want to make sure plenty of these finish after task 0 fails, and some even finish + // after the previous stage is retried and this stage retry is started + Thread.sleep((500 + math.random * 5000).toLong) } - } else { - Thread.sleep(10000) } - // want to make sure plenty of these finish after task 0 fails, and some even finish - // after the previous stage is retried and this stage retry is started - Thread.sleep((500 + math.random * 5000).toLong) itr.map{x => ((x._1 + 5) % 100) -> x._2 } } - val shuffledAgain = shuffled.flatMap{ case(k,vs) => vs.map{k -> _}}.groupByKey(100) - val data = shuffledAgain.mapPartitions { itr => - Thread.sleep(10000) - itr.flatMap{_._2} - }.cache().collect() + val shuffledAgain = shuffled.flatMap{ case(k,vs) => vs.map(k -> _) }.groupByKey(100) + val data = shuffledAgain.mapPartitions { itr => itr.flatMap(_._2) }.cache().collect() val count = data.size assert(count === 1e6.toInt) assert(data.toSet === (1 to 1e6.toInt).toSet) From de235303c4e708323bbdcf2b6bbc003ad230fbe6 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 May 2015 11:27:49 -0500 Subject: [PATCH 09/18] SparkIllegalStateException if we ever have multiple concurrent attempts for the same stage --- .../main/scala/org/apache/spark/SparkException.scala | 10 ++++++++++ .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 6 ++++++ 2 files changed, 16 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 2ebd7a7151a59..b7c2386fd7d87 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -30,3 +30,13 @@ class SparkException(message: String, cause: Throwable) */ private[spark] class SparkDriverExecutionException(cause: Throwable) extends SparkException("Execution error", cause) + +/** + * Exception indicating an error internal to Spark -- it is in an inconsistent state, not due + * to any error by the user + */ +class SparkIllegalStateException(message: String, cause: Throwable) + extends SparkException(message, cause) { + + def this(message: String) = this(message, null) +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index b4b8a630694bb..d37f02e51e3bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -159,6 +159,12 @@ private[spark] class TaskSchedulerImpl( this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager + val taskSetsPerStage = activeTaskSets.values.filterNot(_.isZombie).groupBy(_.stageId) + taskSetsPerStage.foreach { case (stage, taskSets) => + if (taskSets.size > 1) { + throw new SparkIllegalStateException("more than one active taskSet for stage " + stage) + } + } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { From c91ee10e166f07828bed66146a2bbdd28633fb34 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 May 2015 11:32:39 -0500 Subject: [PATCH 10/18] better unit test --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index c3d009f186eed..712c68de107b7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -807,8 +807,8 @@ class DAGSchedulerSuite if (idx == 0) { throw new FetchFailedException(aBm, 0, 0, idx, cause = new RuntimeException("simulated fetch failure")) - } else if (idx > 0 && math.random < 0.1) { - Thread.sleep(10000) + } else if (idx > 0 && math.random < 0.2) { + Thread.sleep(5000) throw new FetchFailedException(aBm, 0, 0, idx, cause = new RuntimeException("simulated fetch failure")) } else { From 05c72fda875a08ee3077ad232796cdf3d54d815f Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 May 2015 17:51:46 -0500 Subject: [PATCH 11/18] handle more cases from bad ordering of task attempt completion --- .../org/apache/spark/MapOutputTracker.scala | 8 +++-- .../apache/spark/scheduler/DAGScheduler.scala | 35 +++++++++++++++++-- .../spark/scheduler/ShuffleMapStage.scala | 20 +++++++++++ .../spark/scheduler/TaskSetManager.scala | 8 +++-- .../shuffle/IndexShuffleBlockManager.scala | 4 +-- .../shuffle/sort/SortShuffleWriter.scala | 2 ++ 6 files changed, 67 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 16072283edbe9..1ecadd6258168 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -378,12 +378,14 @@ private[spark] object MapOutputTracker extends Logging { reduceId: Int, statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { assert (statuses != null) + var idx = -1 statuses.map { status => + idx += 1 if (status == null) { - logError("Missing an output location for shuffle " + shuffleId) - throw new MetadataFetchFailedException( - shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId) + val msg = "Missing an output location for shuffle " + shuffleId + ": map " + idx + logError(msg) + throw new MetadataFetchFailedException(shuffleId, reduceId, msg) } else { (status.location, status.getSizeForBlock(reduceId)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d02a4bf35bb3b..0ced368ec5718 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -830,6 +830,10 @@ class DAGScheduler( logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() + stage match { + case smt: ShuffleMapStage => smt.clearPartitionComputeCount() + case _ => + } // First figure out the indexes of partition ids to compute. @@ -974,6 +978,7 @@ class DAGScheduler( val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) + // REVIEWERS: does this need special handling for multiple completions of the same task? outputCommitCoordinator.taskCompleted(stageId, task.partitionId, event.taskInfo.attempt, event.reason) @@ -1031,15 +1036,31 @@ class DAGScheduler( case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] + val computeCount = shuffleStage.incComputeCount(smt.partitionId) updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { - logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) + if (computeCount > 1) { + // REVIEWERS: do I need to worry about speculation here, when multiple completion events + // are normal? + + // REVIEWERS: is this really only a problem on a ShuffleMapTask?? does it also cause + // problems for ResultTask? + + // This can happen when a retry runs a task, but there was a lingering task from an + // earlier attempt which also finished. The results might be OK, or they might not. + // To be safe, we'll retry the task, and do it in yet another attempt, to avoid more + // task output clobbering. + logInfo(s"Multiple completion events for task $task. Results may be corrupt," + + s" assuming task needs to be rerun.") + shuffleStage.removeOutputLoc(task.partitionId) + } else if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { + logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { shuffleStage.addOutputLoc(smt.partitionId, status) } + if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") @@ -1133,6 +1154,16 @@ class DAGScheduler( mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) } + // We also have to mark this map output as unavailable. Its possible that a *later* attempt + // has finished this task in the meantime, but when this task fails, it might end up + // deleting the mapOutput from the earlier successful attempt. + failedStage match { + case smt: ShuffleMapStage => + smt.incComputeCount(reduceId) + smt.removeOutputLoc(reduceId) + case _ => + } + // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index d02210743484c..da3916430c0b2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import scala.collection.mutable.HashMap + import org.apache.spark.ShuffleDependency import org.apache.spark.rdd.RDD import org.apache.spark.storage.BlockManagerId @@ -43,6 +45,17 @@ private[spark] class ShuffleMapStage( val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) + private val partitionComputeCount = HashMap[Int, Int]() + + def incComputeCount(partition: Int): Int = { + partitionComputeCount(partition) = partitionComputeCount.getOrElse(partition, 0) + 1 + partitionComputeCount(partition) + } + + def clearPartitionComputeCount(): Unit = { + partitionComputeCount.clear() + } + def addOutputLoc(partition: Int, status: MapStatus): Unit = { val prevList = outputLocs(partition) outputLocs(partition) = status :: prevList @@ -51,6 +64,13 @@ private[spark] class ShuffleMapStage( } } + def removeOutputLoc(partition: Int): Unit = { + if (outputLocs(partition) != Nil) { + outputLocs(partition) = Nil + numAvailableOutputs -= 1 + } + } + def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = { val prevList = outputLocs(partition) val newList = prevList.filterNot(_.location == bmAddress) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 7dc325283d961..4ecf8a85e875e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -620,12 +620,14 @@ private[spark] class TaskSetManager( val index = info.index info.markSuccessful() removeRunningTask(tid) + val task = tasks(index) sched.dagScheduler.taskEnded( - tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) + task, Success, result.value(), result.accumUpdates, info, result.metrics) if (!successful(index)) { tasksSuccessful += 1 - logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( - info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) + // include the partition here b/c on a retry, the partition is *not* the same as info.id + logInfo("Finished task %s (partition %d) in stage %s (TID %d) in %d ms on executor %s (%s) (%d/%d)".format( + info.id, task.partitionId, taskSet.id, info.taskId, info.duration, info.executorId, info.host, tasksSuccessful, numTasks)) // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala index 6ec29296ebca3..468aab2fe9867 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import com.google.common.io.ByteStreams -import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.{TaskContext, Logging, SparkConf, SparkEnv} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.storage._ @@ -42,7 +42,7 @@ import IndexShuffleBlockManager.NOOP_REDUCE_ID // Note: Changes to the format in this file should be kept in sync with // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData(). private[spark] -class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver { +class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver with Logging { private lazy val blockManager = SparkEnv.get.blockManager diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index bb4db87b79cff..b5838ffa30469 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -66,6 +66,8 @@ private[spark] class SortShuffleWriter[K, V, C]( // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) + // Because we append to the data file, we need the index file to know the current size of the + // data file as a starting point val initialFileLength = outputFile.length() val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) From 37eece86b0ef00fe6408963ffe5a8f40f7ec6990 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 May 2015 19:48:26 -0500 Subject: [PATCH 12/18] cleanup imports --- .../org/apache/spark/shuffle/IndexShuffleBlockResolver.scala | 2 +- .../scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 7564cc3586e0e..a6bd3020d147f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -21,7 +21,7 @@ import java.io._ import com.google.common.io.ByteStreams -import org.apache.spark.{TaskContext, Logging, SparkConf, SparkEnv} +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.storage._ diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index bdca30b8d04f2..c7c7ce6860514 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -17,7 +17,7 @@ package org.apache.spark.shuffle.sort -import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext} +import org.apache.spark.{SparkEnv, Logging, TaskContext} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle} From 31c21fae51e1aa304e4e53c7ef36d1e1689abdee Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 May 2015 20:26:50 -0500 Subject: [PATCH 13/18] style --- .../org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- .../org/apache/spark/scheduler/TaskSetManager.scala | 5 +++-- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 13 +++++++------ 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0ced368ec5718..bb634343049b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1042,8 +1042,8 @@ class DAGScheduler( val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (computeCount > 1) { - // REVIEWERS: do I need to worry about speculation here, when multiple completion events - // are normal? + // REVIEWERS: do I need to worry about speculation here, when multiple completion + // events are normal? // REVIEWERS: is this really only a problem on a ShuffleMapTask?? does it also cause // problems for ResultTask? diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4ecf8a85e875e..1168b6b0d1627 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -626,8 +626,9 @@ private[spark] class TaskSetManager( if (!successful(index)) { tasksSuccessful += 1 // include the partition here b/c on a retry, the partition is *not* the same as info.id - logInfo("Finished task %s (partition %d) in stage %s (TID %d) in %d ms on executor %s (%s) (%d/%d)".format( - info.id, task.partitionId, taskSet.id, info.taskId, info.duration, info.executorId, info.host, tasksSuccessful, numTasks)) + logInfo(("Finished task %s in stage %s (TID %d, partition %d) in %d ms on executor %s (%s) " + + "(%d/%d)").format(info.id, taskSet.id, task.partitionId, info.taskId, info.duration, + info.executorId, info.host, tasksSuccessful, numTasks)) // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 712c68de107b7..b4b6d66b6cafb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -785,9 +785,10 @@ class DAGSchedulerSuite val bms = ArrayBuffer[BlockManagerId]() val stageFailureCount = HashMap[Int, Int]() clusterSc.addSparkListener(new SparkListener { - override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { - bms += blockManagerAdded.blockManagerId + override def onBlockManagerAdded(bmAdded: SparkListenerBlockManagerAdded): Unit = { + bms += bmAdded.blockManagerId } + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { if (stageCompleted.stageInfo.failureReason.isDefined) { val stage = stageCompleted.stageInfo.stageId @@ -797,10 +798,10 @@ class DAGSchedulerSuite } }) try { - val rawData = clusterSc.parallelize(1 to 1e6.toInt, 20).map{x => (x % 100) -> x}.cache() + val rawData = clusterSc.parallelize(1 to 1e6.toInt, 20).map { x => (x % 100) -> x }.cache() rawData.count() val aBm = bms(0) - val shuffled = rawData.groupByKey(100).mapPartitionsWithIndex{ case (idx, itr) => + val shuffled = rawData.groupByKey(100).mapPartitionsWithIndex { case (idx, itr) => // we want one failure quickly, and more failures after stage 0 has finished its // second attempt if (TaskContext.get().asInstanceOf[TaskContextImpl].stageAttemptId == 0) { @@ -817,9 +818,9 @@ class DAGSchedulerSuite Thread.sleep((500 + math.random * 5000).toLong) } } - itr.map{x => ((x._1 + 5) % 100) -> x._2 } + itr.map { x => ((x._1 + 5) % 100) -> x._2 } } - val shuffledAgain = shuffled.flatMap{ case(k,vs) => vs.map(k -> _) }.groupByKey(100) + val shuffledAgain = shuffled.flatMap { case (k, vs) => vs.map(k -> _) }.groupByKey(100) val data = shuffledAgain.mapPartitions { itr => itr.flatMap(_._2) }.cache().collect() val count = data.size assert(count === 1e6.toInt) From a894be11d2d22841017d82676a66088f0d053dcf Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 13 May 2015 08:42:05 -0500 Subject: [PATCH 14/18] include all missing mapIds in error msg --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 663e434a1c1dc..3f77765d132c7 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -382,12 +382,11 @@ private[spark] object MapOutputTracker extends Logging { reduceId: Int, statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { assert (statuses != null) - var idx = -1 statuses.map { status => - idx += 1 if (status == null) { - val msg = "Missing an output location for shuffle " + shuffleId + ": map " + idx + val missing = statuses.iterator.zipWithIndex.filter{_._1 == null}.map{_._2}.mkString(",") + val msg = "Missing an output location for shuffle =" + shuffleId + "; mapIds =" + missing logError(msg) throw new MetadataFetchFailedException(shuffleId, reduceId, msg) } else { From 93592b1554ed8091c93a3f58cfd2951cb4cac088 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 13 May 2015 08:44:38 -0500 Subject: [PATCH 15/18] update existing test since we now do more resubmitting than before --- .../spark/scheduler/DAGSchedulerSuite.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b4b6d66b6cafb..d975d62f5dea7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -536,7 +536,7 @@ class DAGSchedulerSuite // should be ignored for being too old runEvent(CompletionEvent( taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null)) - // should work because it's a non-failed host + // its a non-failed host, but we can't be sure if the results were clobbered runEvent(CompletionEvent( taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null)) // should be ignored for being too old @@ -546,9 +546,21 @@ class DAGSchedulerSuite taskSet.tasks(1).epoch = newEpoch runEvent(CompletionEvent( taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null)) + + //now we should have a new taskSet for stage 0, which has us retry partition 0 + assert(taskSets.size === 2) + val newTaskSet = taskSets(1) + assert(newTaskSet.stageId === 0) + assert(newTaskSet.attempt === 1) + assert(newTaskSet.tasks.size === 1) + val newTask = newTaskSet.tasks(0) + assert(newTask.epoch === newEpoch + 1) + runEvent(CompletionEvent( + newTask, Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null)) + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) - complete(taskSets(1), Seq((Success, 42), (Success, 43))) + complete(taskSets(2), Seq((Success, 42), (Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } From ea2d9720b26e687ddde777a83c6a6b11db663af2 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 13 May 2015 12:54:52 -0500 Subject: [PATCH 16/18] style --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d975d62f5dea7..29056eff0e57e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -547,7 +547,7 @@ class DAGSchedulerSuite runEvent(CompletionEvent( taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null)) - //now we should have a new taskSet for stage 0, which has us retry partition 0 + // now we should have a new taskSet for stage 0, which has us retry partition 0 assert(taskSets.size === 2) val newTaskSet = taskSets(1) assert(newTaskSet.stageId === 0) From 6654c538cf83126d9ae0de7e39cc19829bfa6fba Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 21 May 2015 15:02:52 -0500 Subject: [PATCH 17/18] fixes from merge --- .../org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java | 4 +++- .../apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index ad7eb04afcd8c..746676c7aa583 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -175,6 +175,8 @@ void closeAndWriteOutput() throws IOException { final SpillInfo[] spills = sorter.closeAndGetSpills(); sorter = null; final long[] partitionLengths; + final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId); + final long initialFileLength = outputFile.length(); try { partitionLengths = mergeSpills(spills); } finally { @@ -184,7 +186,7 @@ void closeAndWriteOutput() throws IOException { } } } - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); + shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths, initialFileLength); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index 83d109115aa5c..17dffe505439d 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -175,7 +175,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; return null; } - }).when(shuffleBlockResolver).writeIndexFile(anyInt(), anyInt(), any(long[].class)); + }).when(shuffleBlockResolver).writeIndexFile(anyInt(), anyInt(), any(long[].class), 0L); when(diskBlockManager.createTempShuffleBlock()).thenAnswer( new Answer>() { From dd2839d1aceef3ca038c0af5cb2b69955d10cc8f Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 21 May 2015 16:54:31 -0500 Subject: [PATCH 18/18] better fix from merge --- .../apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index 17dffe505439d..ffbe8d37d025b 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -175,7 +175,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; return null; } - }).when(shuffleBlockResolver).writeIndexFile(anyInt(), anyInt(), any(long[].class), 0L); + }).when(shuffleBlockResolver).writeIndexFile(anyInt(), anyInt(), any(long[].class), eq(0L)); when(diskBlockManager.createTempShuffleBlock()).thenAnswer( new Answer>() {