diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index b89212eaabf6c..38836d44b04e8 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -236,13 +236,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey - if (rddToAdd.isDefined && rddToAdd == getRddId(blockId)) { - logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + - "block from the same RDD") - return false + if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { + selectedBlocks += blockId + selectedMemory += pair.getValue.size } - selectedBlocks += blockId - selectedMemory += pair.getValue.size } } @@ -264,6 +261,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } return true } else { + logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " + + "from the same RDD") return false } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 121e47c7b1b41..1036b9f34e9dd 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -662,4 +662,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a1") == None, "a1 should not be in store") } } + + test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + // Access rdd_1_0 to ensure it's not least recently used. + assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") + // According to the same-RDD rule, rdd_1_0 should be replaced here. + store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + // rdd_1_0 should have been replaced, even it's not least recently used. + assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") + assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") + assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") + } }