From 2f6264da5022838a128d9b81aed81669921a653c Mon Sep 17 00:00:00 2001 From: Devesh Agrawal Date: Wed, 22 Jul 2020 08:44:02 -0700 Subject: [PATCH] Trying to reduce the flakyness of BlockManagerDecommissioningIntegrationSuite --- core/src/test/resources/log4j.properties | 2 +- ...kManagerDecommissionIntegrationSuite.scala | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 2f46ce1553ee..b288ea9cf6f5 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -17,7 +17,7 @@ # Set everything to be logged to the file target/unit-tests.log test.appender=file -log4j.rootCategory=INFO, ${test.appender} +log4j.rootCategory=DEBUG, ${test.appender} log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 57410103dd08..c772338382ec 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -57,6 +57,9 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS .set(config.STORAGE_DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle) + .set(config.EXECUTOR_CORES.key, "1") + .set(config.CPUS_PER_TASK.key, "1") + .set(config.EXECUTOR_MEMORY.key, "1024m") // Just replicate blocks as fast as we can during testing, there isn't another // workload we need to worry about. .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) @@ -90,6 +93,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val taskStartSem = new Semaphore(0) val broadcastSem = new Semaphore(0) val executorRemovedSem = new Semaphore(0) + val taskStartEvents = ArrayBuffer.empty[SparkListenerTaskStart] val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd] val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated] sc.addSparkListener(new SparkListener { @@ -99,6 +103,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + taskStartEvents.append(taskStart) taskStartSem.release() } @@ -145,8 +150,10 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS assert(execs.size == numExecs, s"Expected ${numExecs} executors but found ${execs.size}") val execToDecommission = execs.head - logDebug(s"Decommissioning executor ${execToDecommission}") - sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo("", false)) + val withTasks = taskStartEvents.map(_.taskInfo.executorId).toSet + val msg = s"Decommissioning executor ${execToDecommission} of $execs $withTasks" + logInfo(msg) + sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo(msg, false)) // Wait for job to finish. val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds) @@ -178,7 +185,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size) assert(!blocksToManagers.filter(_._2 > 1).isEmpty, s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" + - s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}") + s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}\n$msg") } // If we're migrating shuffles we look for any shuffle block updates // as there is no block update on the initial shuffle block write. @@ -191,11 +198,13 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val blockId = update.blockUpdatedInfo.blockId blockId.isInstanceOf[ShuffleIndexBlockId] }.size - assert(numDataLocs === 1, s"Expect shuffle data block updates in ${blocksUpdated}") - assert(numIndexLocs === 1, s"Expect shuffle index block updates in ${blocksUpdated}") + assert(numDataLocs === 1, s"Expect shuffle data block updates in ${blocksUpdated}\n$msg") + assert(numIndexLocs === 1, s"Expect shuffle index block updates in ${blocksUpdated}\n$msg") } } + logInfo(s"Blocks updated is $blocksUpdated") + // Since the RDD is cached or shuffled so further usage of same RDD should use the // cached data. Original RDD partitions should not be recomputed i.e. accum // should have same value like before