From f4d7a64f06e24e03e150f890e76fadaa6ecf2632 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Thu, 12 Sep 2019 19:37:22 +0800 Subject: [PATCH 1/3] fix flaky test --- .../scheduler/BarrierTaskContextSuite.scala | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index 8d5f04ac7651a..84c34445ab46f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -156,10 +156,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { assert(error.contains("within 1 second(s)")) } - - def testBarrierTaskKilled(sc: SparkContext, interruptOnCancel: Boolean): Unit = { - sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString) - + def testBarrierTaskKilled(interruptOnKill: Boolean): Unit = { withTempDir { dir => val killedFlagFile = "barrier.task.killed" val rdd = sc.makeRDD(Seq(0, 1), 2) @@ -181,12 +178,15 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { val listener = new SparkListener { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - new Thread { - override def run: Unit = { - Thread.sleep(1000) - sc.killTaskAttempt(taskStart.taskInfo.taskId, interruptThread = false) - } - }.start() + val partitionId = taskStart.taskInfo.index + if (partitionId == 0) { + new Thread { + override def run: Unit = { + Thread.sleep(1000) + sc.killTaskAttempt(taskStart.taskInfo.taskId, interruptThread = interruptOnKill) + } + }.start() + } } } sc.addSparkListener(listener) @@ -201,7 +201,18 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { } } - test("barrier task killed") { + test("barrier task killed, no interrupt") { + val conf = new SparkConf() + .set("spark.barrier.sync.timeout", "1") + .set(TEST_NO_STAGE_RETRY, true) + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") + sc = new SparkContext(conf) + + testBarrierTaskKilled(interruptOnKill = false) + } + + test("barrier task killed, interrupt") { val conf = new SparkConf() .set("spark.barrier.sync.timeout", "1") .set(TEST_NO_STAGE_RETRY, true) @@ -209,7 +220,6 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { .setAppName("test-cluster") sc = new SparkContext(conf) - testBarrierTaskKilled(sc, true) - testBarrierTaskKilled(sc, false) + testBarrierTaskKilled(interruptOnKill = true) } } From 9dbdcb896c2b960555c6ade05c43e44722778813 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Mon, 16 Sep 2019 14:05:05 +0800 Subject: [PATCH 2/3] address comments --- .../scheduler/BarrierTaskContextSuite.scala | 50 +++++-------------- 1 file changed, 13 insertions(+), 37 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index 84c34445ab46f..a7115242071d4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -26,13 +26,19 @@ import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { - test("global sync by barrier() call") { + def initLocalClusterSparkContext(): Unit = { val conf = new SparkConf() // Init local cluster here so each barrier task runs in a separated process, thus `barrier()` // call is actually useful. .setMaster("local-cluster[4, 1, 1024]") .setAppName("test-cluster") + .set("spark.barrier.sync.timeout", "1") + .set(TEST_NO_STAGE_RETRY, true) sc = new SparkContext(conf) + } + + test("global sync by barrier() call") { + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -48,10 +54,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { } test("support multiple barrier() call within a single task") { - val conf = new SparkConf() - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -77,12 +80,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { } test("throw exception on barrier() call timeout") { - val conf = new SparkConf() - .set("spark.barrier.sync.timeout", "1") - .set(TEST_NO_STAGE_RETRY, true) - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -102,12 +100,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { } test("throw exception if barrier() call doesn't happen on every task") { - val conf = new SparkConf() - .set("spark.barrier.sync.timeout", "1") - .set(TEST_NO_STAGE_RETRY, true) - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -125,12 +118,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { } test("throw exception if the number of barrier() calls are not the same on every task") { - val conf = new SparkConf() - .set("spark.barrier.sync.timeout", "1") - .set(TEST_NO_STAGE_RETRY, true) - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -202,24 +190,12 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { } test("barrier task killed, no interrupt") { - val conf = new SparkConf() - .set("spark.barrier.sync.timeout", "1") - .set(TEST_NO_STAGE_RETRY, true) - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) - + initLocalClusterSparkContext() testBarrierTaskKilled(interruptOnKill = false) } test("barrier task killed, interrupt") { - val conf = new SparkConf() - .set("spark.barrier.sync.timeout", "1") - .set(TEST_NO_STAGE_RETRY, true) - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) - + initLocalClusterSparkContext() testBarrierTaskKilled(interruptOnKill = true) } } From c8dcf425ea63d00be9bb02c096e480e8bf33bb4a Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 17 Sep 2019 12:23:23 +0800 Subject: [PATCH 3/3] update --- .../org/apache/spark/scheduler/BarrierTaskContextSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index a7115242071d4..fc8ac38479932 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -32,7 +32,6 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { // call is actually useful. .setMaster("local-cluster[4, 1, 1024]") .setAppName("test-cluster") - .set("spark.barrier.sync.timeout", "1") .set(TEST_NO_STAGE_RETRY, true) sc = new SparkContext(conf) } @@ -81,6 +80,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { test("throw exception on barrier() call timeout") { initLocalClusterSparkContext() + sc.conf.set("spark.barrier.sync.timeout", "1") val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -101,6 +101,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { test("throw exception if barrier() call doesn't happen on every task") { initLocalClusterSparkContext() + sc.conf.set("spark.barrier.sync.timeout", "1") val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -119,6 +120,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { test("throw exception if the number of barrier() calls are not the same on every task") { initLocalClusterSparkContext() + sc.conf.set("spark.barrier.sync.timeout", "1") val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get()