From 2710de064ee3cad8c5db117f8fd4b1a984303b1e Mon Sep 17 00:00:00 2001 From: "jianran.tfh" Date: Thu, 16 Mar 2017 15:05:28 +0800 Subject: [PATCH 1/3] same rdd rule testcase --- .../spark/storage/BlockManagerSuite.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 87c8628ce97e9..ce4c68a493e0c 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -595,9 +595,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store") assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store") - // Do a get() on rdd_0_2 so that it is the most recently used item - assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") - // Put in more partitions from RDD 0; they should replace rdd_1_1 + // Same-RDD rule: Put in more partitions from RDD 0; they should replace rdd_1_1 store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped @@ -609,6 +607,20 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store") } + test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { + store = makeBlockManager(12000) + store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + // Access rdd_1_0 to ensure it's not least recently used. + assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") + // According to the same-RDD rule, rdd_1_0 should be replaced here. + store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + // rdd_1_0 should have been replaced, even it's not least recently used. + assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") + assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") + assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") + } + test("on-disk storage") { store = makeBlockManager(1200) val a1 = new Array[Byte](400) @@ -1064,19 +1076,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(matchedBlockIds.toSet === Set(RDDBlockId(1, 0), RDDBlockId(1, 1))) } - test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { - store = makeBlockManager(12000) - store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - // Access rdd_1_0 to ensure it's not least recently used. - assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") - // According to the same-RDD rule, rdd_1_0 should be replaced here. - store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - // rdd_1_0 should have been replaced, even it's not least recently used. - assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") - assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") - assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") - } + test("safely unroll blocks through putIterator (disk)") { store = makeBlockManager(12000) From a740052594352f45267812fc65a04082dcf3913c Mon Sep 17 00:00:00 2001 From: "jianran.tfh" Date: Thu, 16 Mar 2017 18:07:17 +0800 Subject: [PATCH 2/3] rollback --- .../spark/storage/BlockManagerSuite.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ce4c68a493e0c..87c8628ce97e9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -595,7 +595,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store") assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store") - // Same-RDD rule: Put in more partitions from RDD 0; they should replace rdd_1_1 + // Do a get() on rdd_0_2 so that it is the most recently used item + assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") + // Put in more partitions from RDD 0; they should replace rdd_1_1 store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped @@ -607,20 +609,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store") } - test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { - store = makeBlockManager(12000) - store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - // Access rdd_1_0 to ensure it's not least recently used. - assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") - // According to the same-RDD rule, rdd_1_0 should be replaced here. - store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - // rdd_1_0 should have been replaced, even it's not least recently used. - assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") - assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") - assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") - } - test("on-disk storage") { store = makeBlockManager(1200) val a1 = new Array[Byte](400) @@ -1076,7 +1064,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(matchedBlockIds.toSet === Set(RDDBlockId(1, 0), RDDBlockId(1, 1))) } - + test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { + store = makeBlockManager(12000) + store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + // Access rdd_1_0 to ensure it's not least recently used. + assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") + // According to the same-RDD rule, rdd_1_0 should be replaced here. + store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + // rdd_1_0 should have been replaced, even it's not least recently used. + assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") + assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") + assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") + } test("safely unroll blocks through putIterator (disk)") { store = makeBlockManager(12000) From b3b97086bbb5e4415e5694fcc0cd33f4511645a9 Mon Sep 17 00:00:00 2001 From: "jianran.tfh" Date: Thu, 16 Mar 2017 18:17:30 +0800 Subject: [PATCH 3/3] remove get --- .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 87c8628ce97e9..2a03d19619a5f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -595,8 +595,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store") assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store") - // Do a get() on rdd_0_2 so that it is the most recently used item - assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") // Put in more partitions from RDD 0; they should replace rdd_1_1 store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)