From 8b29c74be6d7b593a6e8dea624856bea1e9bceed Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 5 Jul 2020 13:21:25 -0700 Subject: [PATCH 1/2] [SPARK-32100][TESTS][FOLLOWUP] Reduce the required test resources --- .../scheduler/WorkerDecommissionExtendedSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index 02c72fa349a7..98112a32ee6c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -32,17 +32,17 @@ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkContext { private val conf = new org.apache.spark.SparkConf() .setAppName(getClass.getName) - .set(SPARK_MASTER, "local-cluster[20,1,512]") + .set(SPARK_MASTER, "local-cluster[10,1,512]") .set(EXECUTOR_MEMORY, "512m") .set(DYN_ALLOCATION_ENABLED, true) .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true) - .set(DYN_ALLOCATION_INITIAL_EXECUTORS, 20) + .set(DYN_ALLOCATION_INITIAL_EXECUTORS, 10) .set(WORKER_DECOMMISSION_ENABLED, true) test("Worker decommission and executor idle timeout") { sc = new SparkContext(conf.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s")) withSpark(sc) { sc => - TestUtils.waitUntilExecutorsUp(sc, 20, 60000) + TestUtils.waitUntilExecutorsUp(sc, 10, 60000) val rdd1 = sc.parallelize(1 to 10, 2) val rdd2 = rdd1.map(x => (1, x)) val rdd3 = rdd2.reduceByKey(_ + _) @@ -54,10 +54,10 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte } } - test("Decommission 19 executors from 20 executors in total") { + test("Decommission 9 executors from 10 executors in total") { sc = new SparkContext(conf) withSpark(sc) { sc => - TestUtils.waitUntilExecutorsUp(sc, 20, 60000) + TestUtils.waitUntilExecutorsUp(sc, 10, 60000) val rdd1 = sc.parallelize(1 to 100000, 200) val rdd2 = rdd1.map(x => (x % 100, x)) val rdd3 = rdd2.reduceByKey(_ + _) From 3790334a4f46852f02ba9820fe74fed31d7a636b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 5 Jul 2020 20:03:16 -0700 Subject: [PATCH 2/2] Address comments --- .../scheduler/WorkerDecommissionExtendedSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index 98112a32ee6c..4de5aaeab5c5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -32,17 +32,17 @@ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkContext { private val conf = new org.apache.spark.SparkConf() .setAppName(getClass.getName) - .set(SPARK_MASTER, "local-cluster[10,1,512]") + .set(SPARK_MASTER, "local-cluster[5,1,512]") .set(EXECUTOR_MEMORY, "512m") .set(DYN_ALLOCATION_ENABLED, true) .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true) - .set(DYN_ALLOCATION_INITIAL_EXECUTORS, 10) + .set(DYN_ALLOCATION_INITIAL_EXECUTORS, 5) .set(WORKER_DECOMMISSION_ENABLED, true) test("Worker decommission and executor idle timeout") { sc = new SparkContext(conf.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s")) withSpark(sc) { sc => - TestUtils.waitUntilExecutorsUp(sc, 10, 60000) + TestUtils.waitUntilExecutorsUp(sc, 5, 60000) val rdd1 = sc.parallelize(1 to 10, 2) val rdd2 = rdd1.map(x => (1, x)) val rdd3 = rdd2.reduceByKey(_ + _) @@ -54,10 +54,10 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte } } - test("Decommission 9 executors from 10 executors in total") { + test("Decommission 4 executors from 5 executors in total") { sc = new SparkContext(conf) withSpark(sc) { sc => - TestUtils.waitUntilExecutorsUp(sc, 10, 60000) + TestUtils.waitUntilExecutorsUp(sc, 5, 60000) val rdd1 = sc.parallelize(1 to 100000, 200) val rdd2 = rdd1.map(x => (x % 100, x)) val rdd3 = rdd2.reduceByKey(_ + _)