From 62c92ac7b8e616529bdaa52b73eb70e50bc01b47 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 7 Mar 2014 16:32:47 +0800 Subject: [PATCH 1/4] Fixed SPARK-1194 https://spark-project.atlassian.net/browse/SPARK-1194 --- .../apache/spark/storage/MemoryStore.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index b89212eaabf6c..c104a1a47f02b 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -236,13 +236,23 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey - if (rddToAdd.isDefined && rddToAdd == getRddId(blockId)) { - logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + - "block from the same RDD") - return false + // Apply the same-RDD rule for cache replacement. Quoted from the + // original RDD paper: + // + // When a new RDD partition is computed but there is not enough + // space to store it, we evict a partition from the least recently + // accessed RDD, unless this is the same RDD as the one with the + // new partition. In that case, we keep the old partition in memory + // to prevent cycling partitions from the same RDD in and out. + // + // TODO implement LRU eviction + rddToAdd match { + case Some(rddId) if rddId == getRddId(blockId) => + // no-op + case _ => + selectedBlocks += blockId + selectedMemory += pair.getValue.size } - selectedBlocks += blockId - selectedMemory += pair.getValue.size } } From 40cdcb2334dc1d66194a1516e7ec998908b7cc78 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 8 Mar 2014 01:08:16 +0800 Subject: [PATCH 2/4] Bug fix, and addressed PR comments from @mridulm Fixed bug: getRddId(blockId) returns an Option[Int], which never equals to rddId: Int. --- .../org/apache/spark/storage/MemoryStore.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index c104a1a47f02b..5a667a80e0f26 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -244,14 +244,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) // accessed RDD, unless this is the same RDD as the one with the // new partition. In that case, we keep the old partition in memory // to prevent cycling partitions from the same RDD in and out. - // - // TODO implement LRU eviction - rddToAdd match { - case Some(rddId) if rddId == getRddId(blockId) => - // no-op - case _ => - selectedBlocks += blockId - selectedMemory += pair.getValue.size + if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { + selectedBlocks += blockId + selectedMemory += pair.getValue.size } } } @@ -274,6 +269,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } return true } else { + logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " + + "from the same RDD") return false } } From 6e40c222f5b140ef3ba355ca3d655e19ee0039be Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 8 Mar 2014 02:56:45 +0800 Subject: [PATCH 3/4] Remove redundant comments --- .../main/scala/org/apache/spark/storage/MemoryStore.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 5a667a80e0f26..38836d44b04e8 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -236,14 +236,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey - // Apply the same-RDD rule for cache replacement. Quoted from the - // original RDD paper: - // - // When a new RDD partition is computed but there is not enough - // space to store it, we evict a partition from the least recently - // accessed RDD, unless this is the same RDD as the one with the - // new partition. In that case, we keep the old partition in memory - // to prevent cycling partitions from the same RDD in and out. if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { selectedBlocks += blockId selectedMemory += pair.getValue.size From 2524ab951df5311ca2dc3f6637f9e6823481e9f2 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 8 Mar 2014 12:53:53 +0800 Subject: [PATCH 4/4] Added regression test case for SPARK-1194 --- .../apache/spark/storage/BlockManagerSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 121e47c7b1b41..1036b9f34e9dd 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -662,4 +662,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a1") == None, "a1 should not be in store") } } + + test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + // Access rdd_1_0 to ensure it's not least recently used. + assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") + // According to the same-RDD rule, rdd_1_0 should be replaced here. + store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + // rdd_1_0 should have been replaced, even it's not least recently used. + assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") + assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") + assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") + } }