diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 29300de7d6638..e30ac62772f54 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -130,7 +130,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log /** Remove all blocks belonging to the given broadcast. */ def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { - val future = askDriverWithReply[Future[Seq[Int]]](RemoveBroadcast(broadcastId, removeFromMaster)) + val future = + askDriverWithReply[Future[Seq[Int]]](RemoveBroadcast(broadcastId, removeFromMaster)) future.onFailure { case e: Throwable => logError("Failed to remove broadcast " + broadcastId + diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 4eeeb9aa9c7ab..ec3b679d29362 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -171,8 +171,9 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { /** Remove all the blocks / files and metadata related to a particular shuffle. */ def removeShuffle(shuffleId: ShuffleId): Boolean = { + val success = removeShuffleBlocks(shuffleId) shuffleStates.remove(shuffleId) - removeShuffleBlocks(shuffleId) + success } /** Remove all the blocks / files related to a particular shuffle. */ diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 9eb434ed0ac0e..83c9854aa51e9 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -285,7 +285,7 @@ class CleanerTester( assert(rddIds.forall(!sc.persistentRdds.contains(_))) assert(rddIds.forall(rddId => !blockManager.master.contains(rddBlockId(rddId)))) - // Verify all shuffles have been deregistered and cleaned up + // Verify all shuffles have been de-registered and cleaned up assert(shuffleIds.forall(!mapOutputTrackerMaster.containsShuffle(_))) assert(shuffleIds.forall(sid => !diskBlockManager.containsBlock(shuffleBlockId(sid))))