Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# Set everything to be logged to the file target/unit-tests.log
test.appender=file
log4j.rootCategory=INFO, ${test.appender}
log4j.rootCategory=DEBUG, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist)
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
.set(config.EXECUTOR_CORES.key, "1")
.set(config.CPUS_PER_TASK.key, "1")
.set(config.EXECUTOR_MEMORY.key, "1024m")
// Just replicate blocks as fast as we can during testing, there isn't another
// workload we need to worry about.
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
Expand Down Expand Up @@ -90,6 +93,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
val taskStartSem = new Semaphore(0)
val broadcastSem = new Semaphore(0)
val executorRemovedSem = new Semaphore(0)
val taskStartEvents = ArrayBuffer.empty[SparkListenerTaskStart]
val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
sc.addSparkListener(new SparkListener {
Expand All @@ -99,6 +103,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
taskStartEvents.append(taskStart)
taskStartSem.release()
}

Expand Down Expand Up @@ -145,8 +150,10 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
assert(execs.size == numExecs, s"Expected ${numExecs} executors but found ${execs.size}")

val execToDecommission = execs.head
logDebug(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo("", false))
val withTasks = taskStartEvents.map(_.taskInfo.executorId).toSet
val msg = s"Decommissioning executor ${execToDecommission} of $execs $withTasks"
logInfo(msg)
sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo(msg, false))

// Wait for job to finish.
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
Expand Down Expand Up @@ -178,7 +185,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
assert(!blocksToManagers.filter(_._2 > 1).isEmpty,
s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" +
s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}\n$msg")
}
// If we're migrating shuffles we look for any shuffle block updates
// as there is no block update on the initial shuffle block write.
Expand All @@ -191,11 +198,13 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
val blockId = update.blockUpdatedInfo.blockId
blockId.isInstanceOf[ShuffleIndexBlockId]
}.size
assert(numDataLocs === 1, s"Expect shuffle data block updates in ${blocksUpdated}")
assert(numIndexLocs === 1, s"Expect shuffle index block updates in ${blocksUpdated}")
assert(numDataLocs === 1, s"Expect shuffle data block updates in ${blocksUpdated}\n$msg")
assert(numIndexLocs === 1, s"Expect shuffle index block updates in ${blocksUpdated}\n$msg")
}
}

logInfo(s"Blocks updated is $blocksUpdated")

// Since the RDD is cached or shuffled so further usage of same RDD should use the
// cached data. Original RDD partitions should not be recomputed i.e. accum
// should have same value like before
Expand Down