From e8d9616e7b38060fa2597e1e21b842cdddcb698f Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Wed, 20 May 2015 12:42:48 -0700 Subject: [PATCH 01/10] Use properties from ActiveJob associated with a Stage --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4a9518fff4e7b..b17ec2146bff9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -946,7 +946,9 @@ class DAGScheduler( stage.resetInternalAccumulators() } - val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull + // Use the scheduling pool, job group, description, etc. from an ActiveJob associated + // with this Stage + val properties = jobIdToActiveJob(jobId).properties runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are From 053471caca9ff5fc593acf2e243964d342663eba Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 26 May 2015 13:47:06 -0700 Subject: [PATCH 02/10] stage.jobId -> jobId in taskScheduler.submitTasks --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b17ec2146bff9..20b553cf37157 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1049,7 +1049,7 @@ class DAGScheduler( stage.pendingPartitions ++= tasks.map(_.partitionId) logDebug("New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks(new TaskSet( - tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties)) + tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark From d2c94655d95db69e2c02e030de21624a7a43dfcb Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 4 Aug 2015 15:10:29 -0700 Subject: [PATCH 03/10] Added test "stage used by two jobs, the first no longer active" --- .../spark/scheduler/DAGSchedulerSuite.scala | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4d6b25455226f..72315c2cb85fa 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.Properties + import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -262,9 +264,10 @@ class DAGSchedulerSuite rdd: RDD[_], partitions: Array[Int], func: (TaskContext, Iterator[_]) => _ = jobComputeFunc, - listener: JobListener = jobListener): Int = { + listener: JobListener = jobListener, + properties: Properties = null): Int = { val jobId = scheduler.nextJobId.getAndIncrement() - runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener)) + runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties)) jobId } @@ -1322,6 +1325,43 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + /** + * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a + * later, active job if they were previously run under a job that is no longer active + */ + test("stage used by two jobs, the first no longer active") { + val baseRdd = new MyRDD(sc, 1, Nil) + val finalRdd1 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) + val finalRdd2 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) + val job1Properties = new Properties() + val job2Properties = new Properties() + job1Properties.setProperty("testProperty", "job1") + job2Properties.setProperty("testProperty", "job2") + + // run job1 + val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) + assert(scheduler.activeJobs.nonEmpty) + val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty") + + // remove job1 as an ActiveJob + cancel(jobId1) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.failedStages.contains(0)) + assert(sparkListener.failedStages.size === 1) + assert(scheduler.activeJobs.isEmpty) + + // run job2 + val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) + assert(scheduler.activeJobs.nonEmpty) + val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") + assert(testProperty1 != testProperty2) + complete(taskSets(1), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assert(scheduler.activeJobs.isEmpty) + + assertDataStructuresEmpty() + } + test("run trivial shuffle with out-of-band failure and retry") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) From 80b47a13a64b98f38fe294a4929bfc6f3ca1804b Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 3 Sep 2015 11:33:58 -0500 Subject: [PATCH 04/10] add test of correct behavior --- .../spark/scheduler/DAGSchedulerSuite.scala | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 72315c2cb85fa..965fb1ae3d423 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1329,33 +1329,48 @@ class DAGSchedulerSuite * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a * later, active job if they were previously run under a job that is no longer active */ - test("stage used by two jobs, the first no longer active") { + test("stage used by two jobs, the first no longer active (SPARK-6880)") { val baseRdd = new MyRDD(sc, 1, Nil) - val finalRdd1 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) - val finalRdd2 = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) + val shuffleDep1 = new ShuffleDependency(baseRdd, null) + val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1)) + val shuffleDep2 = new ShuffleDependency(intermediateRdd, null) + val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2)) + val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2)) val job1Properties = new Properties() val job2Properties = new Properties() job1Properties.setProperty("testProperty", "job1") job2Properties.setProperty("testProperty", "job2") - // run job1 + // run both job 1 & 2, referencing the same stage, then cancel job1 val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) + val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) assert(scheduler.activeJobs.nonEmpty) val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty") // remove job1 as an ActiveJob cancel(jobId1) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) - assert(scheduler.activeJobs.isEmpty) - // run job2 - val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) + // job2 should still be running assert(scheduler.activeJobs.nonEmpty) val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") assert(testProperty1 != testProperty2) - complete(taskSets(1), Seq((Success, 42))) + assert(taskSets(0).properties != null) + // NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document + // the current behavior. We've already submitted the task set for stage 0 based on job1 -- + // even though we have cancelled that job, and now we're running it b/c of job2, we haven't + // updated its properties. It might be desirable to have this actually change to "job2" + assert(taskSets(0).properties.getProperty("testProperty") === "job1") + complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) + + // the next two asserts the key checks for SPARK-6880 -- they make sure that the stage which + // was shared by both jobs, but never submitted any tasks for the first job, takes the props + // of the second job + assert(taskSets(1).properties != null) + assert(taskSets(1).properties.getProperty("testProperty") === "job2") + complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1)))) + assert(taskSets(2).properties != null) + complete(taskSets(2), Seq((Success, 42))) assert(results === Map(0 -> 42)) assert(scheduler.activeJobs.isEmpty) From 6f000214cce4404e3e1c55239de47d924624477b Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 3 Sep 2015 11:44:55 -0500 Subject: [PATCH 05/10] expand comment --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 965fb1ae3d423..7b23869c21168 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1342,6 +1342,9 @@ class DAGSchedulerSuite job2Properties.setProperty("testProperty", "job2") // run both job 1 & 2, referencing the same stage, then cancel job1 + // Note that we have to submit job2 before we cancel job1, to have them actually share + // *Stages*, and not just shuffle dependencies, due to skipped stages. (at least until + // we address SPARK-10193) val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) assert(scheduler.activeJobs.nonEmpty) From 5b77f892a96641d4aeb1f7d08a09c09144d9dc36 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 3 Sep 2015 16:52:44 -0500 Subject: [PATCH 06/10] add a test which includes a fetch failure --- .../spark/scheduler/DAGSchedulerSuite.scala | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 7b23869c21168..3c2854ff73d29 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1380,6 +1380,79 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + /** + * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a + * later, active job if they were previously run under a job that is no longer active, even when + * there are fetch failures + */ + test("stage used by two jobs, some fetch failures, and the first job no longer active " + + "(SPARK-6880)") { + val baseRdd = new MyRDD(sc, 1, Nil) + val shuffleDep1 = new ShuffleDependency(baseRdd, null) + val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1)) + val shuffleDep2 = new ShuffleDependency(intermediateRdd, null) + val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2)) + val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2)) + val job1Properties = new Properties() + val job2Properties = new Properties() + job1Properties.setProperty("testProperty", "job1") + job2Properties.setProperty("testProperty", "job2") + + def checkJobProperties(taskSet: TaskSet, expected: String): Unit = { + assert(taskSet.properties != null) + assert(taskSet.properties.getProperty("testProperty") === expected) + } + + // run both job 1 & 2, referencing the same stage, then cancel job1 + // Note that we have to submit job2 before we cancel job1, to have them actually share + // *Stages*, and not just shuffle dependencies, due to skipped stages. (at least until + // we address SPARK-10193) + val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) + val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) + assert(scheduler.activeJobs.nonEmpty) + val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty") + + // job 1 finishes stage 0 + assert(taskSets(0).properties.getProperty("testProperty") === "job1") + complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) + + // remove job1 as an ActiveJob + cancel(jobId1) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // job2 should still be running, starts from stage 1 + assert(scheduler.activeJobs.nonEmpty) + val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") + assert(testProperty1 != testProperty2) + // NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document + // the current behavior. We've already submitted the task set for stage 0 based on job1 -- + // even though we have cancelled that job, and now we're running it b/c of job2, we haven't + // updated its properties. It might be desirable to have this actually change to "job2" + checkJobProperties(taskSets(1), "job1") + + // but lets say there is a fetch failure in this task set, which makes us go back and + // run stage 0, attempt 1 + complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null))) + scheduler.resubmitFailedStages() + + // but stage 0, attempt 1 should have the properties of job2 + assert(taskSets(2).stageId === 0) + assert(taskSets(2).stageAttemptId === 1) + checkJobProperties(taskSets(2), "job2") + + // run the rest of the stages normally, checking they have the right properties + complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) + checkJobProperties(taskSets(3), "job2") + complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1)))) + checkJobProperties(taskSets(4), "job2") + complete(taskSets(4), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assert(scheduler.activeJobs.isEmpty) + + assertDataStructuresEmpty() + } + test("run trivial shuffle with out-of-band failure and retry") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) From 4454a4e703e7874616a05f2a899b88753ed1fcf5 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 3 Sep 2015 16:55:37 -0500 Subject: [PATCH 07/10] cleanup comments --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3c2854ff73d29..d6325124ff67a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1430,13 +1430,13 @@ class DAGSchedulerSuite // updated its properties. It might be desirable to have this actually change to "job2" checkJobProperties(taskSets(1), "job1") - // but lets say there is a fetch failure in this task set, which makes us go back and + // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 complete(taskSets(1), Seq( (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null))) scheduler.resubmitFailedStages() - // but stage 0, attempt 1 should have the properties of job2 + // stage 0, attempt 1 should have the properties of job2 assert(taskSets(2).stageId === 0) assert(taskSets(2).stageAttemptId === 1) checkJobProperties(taskSets(2), "job2") From 091e19a06933e0d8d0733e396f6c5eea4c45f1b0 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 8 Sep 2015 10:50:30 -0700 Subject: [PATCH 08/10] A tiny amount of refactoring and some comment editing --- .../spark/scheduler/DAGSchedulerSuite.scala | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d6325124ff67a..4f9a284949057 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1325,6 +1325,11 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + def checkJobProperties(taskSet: TaskSet, expected: String): Unit = { + assert(taskSet.properties != null) + assert(taskSet.properties.getProperty("testProperty") === expected) + } + /** * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a * later, active job if they were previously run under a job that is no longer active @@ -1341,10 +1346,10 @@ class DAGSchedulerSuite job1Properties.setProperty("testProperty", "job1") job2Properties.setProperty("testProperty", "job2") - // run both job 1 & 2, referencing the same stage, then cancel job1 - // Note that we have to submit job2 before we cancel job1, to have them actually share - // *Stages*, and not just shuffle dependencies, due to skipped stages. (at least until - // we address SPARK-10193) + // Run jobs 1 & 2, both referencing the same stage, then cancel job1. + // Note that we have to submit job2 before we cancel job1 to have them actually share + // *Stages*, and not just shuffle dependencies, due to skipped stages (at least until + // we address SPARK-10193.) val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) assert(scheduler.activeJobs.nonEmpty) @@ -1359,18 +1364,19 @@ class DAGSchedulerSuite val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") assert(testProperty1 != testProperty2) assert(taskSets(0).properties != null) - // NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document - // the current behavior. We've already submitted the task set for stage 0 based on job1 -- - // even though we have cancelled that job, and now we're running it b/c of job2, we haven't - // updated its properties. It might be desirable to have this actually change to "job2" + // NB: This next assert isn't necessarily the "desired" behavior; it's just to document + // the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but + // even though we have cancelled that job and are now running it because of job2, we haven't + // updated the TaskSet's properties. Changing the properties to "job2" is likely the more + // correct behavior. assert(taskSets(0).properties.getProperty("testProperty") === "job1") complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) - // the next two asserts the key checks for SPARK-6880 -- they make sure that the stage which - // was shared by both jobs, but never submitted any tasks for the first job, takes the props - // of the second job - assert(taskSets(1).properties != null) - assert(taskSets(1).properties.getProperty("testProperty") === "job2") + // The next check is the key for SPARK-6880. For the stage which was shared by both job1 and + // job2 but never had any tasks submitted for job1, the properties of job2 are now used to run + // the stage. + checkJobProperties(taskSets(1), "job2") + complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1)))) assert(taskSets(2).properties != null) complete(taskSets(2), Seq((Success, 42))) @@ -1403,10 +1409,10 @@ class DAGSchedulerSuite assert(taskSet.properties.getProperty("testProperty") === expected) } - // run both job 1 & 2, referencing the same stage, then cancel job1 - // Note that we have to submit job2 before we cancel job1, to have them actually share - // *Stages*, and not just shuffle dependencies, due to skipped stages. (at least until - // we address SPARK-10193) + // Run jobs 1 & 2, both referencing the same stage, then cancel job1. + // Note that we have to submit job2 before we cancel job1 to have them actually share + // *Stages*, and not just shuffle dependencies, due to skipped stages (at least until + // we address SPARK-10193.) val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) assert(scheduler.activeJobs.nonEmpty) @@ -1424,10 +1430,11 @@ class DAGSchedulerSuite assert(scheduler.activeJobs.nonEmpty) val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") assert(testProperty1 != testProperty2) - // NB: this is next assert isn't necessarily the "desired" behavior, its more so to just document - // the current behavior. We've already submitted the task set for stage 0 based on job1 -- - // even though we have cancelled that job, and now we're running it b/c of job2, we haven't - // updated its properties. It might be desirable to have this actually change to "job2" + // NB: This next assert isn't necessarily the "desired" behavior; it's just to document + // the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but + // even though we have cancelled that job and are now running it because of job2, we haven't + // updated the TaskSet's properties. Changing the properties to "job2" is likely the more + // correct behavior. checkJobProperties(taskSets(1), "job1") // lets say there is a fetch failure in this task set, which makes us go back and @@ -1441,7 +1448,7 @@ class DAGSchedulerSuite assert(taskSets(2).stageAttemptId === 1) checkJobProperties(taskSets(2), "job2") - // run the rest of the stages normally, checking they have the right properties + // run the rest of the stages normally, checking that they have the correct properties complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) checkJobProperties(taskSets(3), "job2") complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1)))) From 0a4ef2457b2f08f9313659f8a4df345e207c707b Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Wed, 21 Oct 2015 11:13:27 -0700 Subject: [PATCH 09/10] DRY code review --- .../spark/scheduler/DAGSchedulerSuite.scala | 63 ++++--------------- 1 file changed, 13 insertions(+), 50 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4f9a284949057..2332a0146e862 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1330,11 +1330,7 @@ class DAGSchedulerSuite assert(taskSet.properties.getProperty("testProperty") === expected) } - /** - * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a - * later, active job if they were previously run under a job that is no longer active - */ - test("stage used by two jobs, the first no longer active (SPARK-6880)") { + def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = { val baseRdd = new MyRDD(sc, 1, Nil) val shuffleDep1 = new ShuffleDependency(baseRdd, null) val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1)) @@ -1363,15 +1359,24 @@ class DAGSchedulerSuite assert(scheduler.activeJobs.nonEmpty) val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") assert(testProperty1 != testProperty2) - assert(taskSets(0).properties != null) // NB: This next assert isn't necessarily the "desired" behavior; it's just to document // the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but // even though we have cancelled that job and are now running it because of job2, we haven't // updated the TaskSet's properties. Changing the properties to "job2" is likely the more // correct behavior. - assert(taskSets(0).properties.getProperty("testProperty") === "job1") + checkJobProperties(taskSets(0), "job1") complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) + shuffleDep1 + } + + /** + * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a + * later, active job if they were previously run under a job that is no longer active + */ + test("stage used by two jobs, the first no longer active (SPARK-6880)") { + launchJobsThatShareStageAndCancelFirst() + // The next check is the key for SPARK-6880. For the stage which was shared by both job1 and // job2 but never had any tasks submitted for job1, the properties of job2 are now used to run // the stage. @@ -1393,49 +1398,7 @@ class DAGSchedulerSuite */ test("stage used by two jobs, some fetch failures, and the first job no longer active " + "(SPARK-6880)") { - val baseRdd = new MyRDD(sc, 1, Nil) - val shuffleDep1 = new ShuffleDependency(baseRdd, null) - val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1)) - val shuffleDep2 = new ShuffleDependency(intermediateRdd, null) - val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2)) - val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2)) - val job1Properties = new Properties() - val job2Properties = new Properties() - job1Properties.setProperty("testProperty", "job1") - job2Properties.setProperty("testProperty", "job2") - - def checkJobProperties(taskSet: TaskSet, expected: String): Unit = { - assert(taskSet.properties != null) - assert(taskSet.properties.getProperty("testProperty") === expected) - } - - // Run jobs 1 & 2, both referencing the same stage, then cancel job1. - // Note that we have to submit job2 before we cancel job1 to have them actually share - // *Stages*, and not just shuffle dependencies, due to skipped stages (at least until - // we address SPARK-10193.) - val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) - val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) - assert(scheduler.activeJobs.nonEmpty) - val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty") - - // job 1 finishes stage 0 - assert(taskSets(0).properties.getProperty("testProperty") === "job1") - complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) - - // remove job1 as an ActiveJob - cancel(jobId1) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - - // job2 should still be running, starts from stage 1 - assert(scheduler.activeJobs.nonEmpty) - val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") - assert(testProperty1 != testProperty2) - // NB: This next assert isn't necessarily the "desired" behavior; it's just to document - // the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but - // even though we have cancelled that job and are now running it because of job2, we haven't - // updated the TaskSet's properties. Changing the properties to "job2" is likely the more - // correct behavior. - checkJobProperties(taskSets(1), "job1") + val shuffleDep1 = launchJobsThatShareStageAndCancelFirst() // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 From 31ba5dea62715bf86b8bdeb4ed2205fafa6f63ef Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 13 Nov 2015 13:05:46 -0800 Subject: [PATCH 10/10] code review --- .../spark/scheduler/DAGSchedulerSuite.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2332a0146e862..653d41fc053c9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1325,16 +1325,17 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } - def checkJobProperties(taskSet: TaskSet, expected: String): Unit = { + def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: String, priority: Int): Unit = { assert(taskSet.properties != null) assert(taskSet.properties.getProperty("testProperty") === expected) + assert(taskSet.priority === priority) } def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = { val baseRdd = new MyRDD(sc, 1, Nil) - val shuffleDep1 = new ShuffleDependency(baseRdd, null) + val shuffleDep1 = new ShuffleDependency(baseRdd, new HashPartitioner(1)) val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1)) - val shuffleDep2 = new ShuffleDependency(intermediateRdd, null) + val shuffleDep2 = new ShuffleDependency(intermediateRdd, new HashPartitioner(1)) val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2)) val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2)) val job1Properties = new Properties() @@ -1353,7 +1354,6 @@ class DAGSchedulerSuite // remove job1 as an ActiveJob cancel(jobId1) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // job2 should still be running assert(scheduler.activeJobs.nonEmpty) @@ -1364,7 +1364,8 @@ class DAGSchedulerSuite // even though we have cancelled that job and are now running it because of job2, we haven't // updated the TaskSet's properties. Changing the properties to "job2" is likely the more // correct behavior. - checkJobProperties(taskSets(0), "job1") + val job1Id = 0 // TaskSet priority for Stages run with "job1" as the ActiveJob + checkJobPropertiesAndPriority(taskSets(0), "job1", job1Id) complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) shuffleDep1 @@ -1380,7 +1381,7 @@ class DAGSchedulerSuite // The next check is the key for SPARK-6880. For the stage which was shared by both job1 and // job2 but never had any tasks submitted for job1, the properties of job2 are now used to run // the stage. - checkJobProperties(taskSets(1), "job2") + checkJobPropertiesAndPriority(taskSets(1), "job2", 1) complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1)))) assert(taskSets(2).properties != null) @@ -1399,6 +1400,7 @@ class DAGSchedulerSuite test("stage used by two jobs, some fetch failures, and the first job no longer active " + "(SPARK-6880)") { val shuffleDep1 = launchJobsThatShareStageAndCancelFirst() + val job2Id = 1 // TaskSet priority for Stages run with "job2" as the ActiveJob // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 @@ -1409,13 +1411,13 @@ class DAGSchedulerSuite // stage 0, attempt 1 should have the properties of job2 assert(taskSets(2).stageId === 0) assert(taskSets(2).stageAttemptId === 1) - checkJobProperties(taskSets(2), "job2") + checkJobPropertiesAndPriority(taskSets(2), "job2", job2Id) // run the rest of the stages normally, checking that they have the correct properties complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) - checkJobProperties(taskSets(3), "job2") + checkJobPropertiesAndPriority(taskSets(3), "job2", job2Id) complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1)))) - checkJobProperties(taskSets(4), "job2") + checkJobPropertiesAndPriority(taskSets(4), "job2", job2Id) complete(taskSets(4), Seq((Success, 42))) assert(results === Map(0 -> 42)) assert(scheduler.activeJobs.isEmpty)