From 8ebff777f940fd440d882999bcd5b2e771d65a3e Mon Sep 17 00:00:00 2001 From: "Wenchen Fan(Cloud)" Date: Fri, 23 May 2014 18:12:53 +0800 Subject: [PATCH 1/5] fix compress memory issue during reduce --- .../org/apache/spark/storage/BlockManager.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6534095811907..6027e4a20d909 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -329,8 +329,21 @@ private[spark] class BlockManager( * never deletes (recent) items. */ def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { - diskStore.getValues(blockId, serializer).orElse( - sys.error("Block " + blockId + " not found on disk, though it should be")) + class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { + + lazy val proxy = f + + def hasNext: Boolean = proxy.hasNext + + def next(): Any = proxy.next() + } + + if (diskStore.contains(blockId)) { + Some(new LazyProxyIterator(diskStore.getValues(blockId, serializer).get)) + } else { + sys.error("Block " + blockId + " not found on disk, though it should be") + None + } } /** From 2c8adb2a373a9a00e8e1ac5c0aec4e26e65408ba Mon Sep 17 00:00:00 2001 From: "Wenchen Fan(Cloud)" Date: Mon, 26 May 2014 12:48:44 +0800 Subject: [PATCH 2/5] add inline comment --- .../scala/org/apache/spark/storage/BlockManager.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6027e4a20d909..222b29ea213ce 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -329,13 +329,18 @@ private[spark] class BlockManager( * never deletes (recent) items. */ def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { + + // Reducer may need to read many local shuffle blocks and will wrap them into Iterators + // at the beginning. The wrapping will cost some memory(compression instance + // initialization, etc.). Reducer read shuffle blocks one by one so we could do the + // wrapping lazily to save memory. class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { lazy val proxy = f - def hasNext: Boolean = proxy.hasNext + override def hasNext: Boolean = proxy.hasNext - def next(): Any = proxy.next() + override def next(): Any = proxy.next() } if (diskStore.contains(blockId)) { From d80c426842b624a19d68e84fbb5c09426cdd7513 Mon Sep 17 00:00:00 2001 From: "Wenchen Fan(Cloud)" Date: Thu, 29 May 2014 14:52:16 +0800 Subject: [PATCH 3/5] remove empty lines in short class --- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 222b29ea213ce..c206dcd91bcf1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -335,11 +335,8 @@ private[spark] class BlockManager( // initialization, etc.). Reducer read shuffle blocks one by one so we could do the // wrapping lazily to save memory. class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { - lazy val proxy = f - override def hasNext: Boolean = proxy.hasNext - override def next(): Any = proxy.next() } From 07f32c2b437cfee001cbfbb254b859dc1ad88058 Mon Sep 17 00:00:00 2001 From: "Wenchen Fan(Cloud)" Date: Thu, 29 May 2014 15:43:44 +0800 Subject: [PATCH 4/5] move the LazyProxyIterator to dataDeserialize --- .../apache/spark/storage/BlockManager.scala | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c206dcd91bcf1..3d6dbf8641321 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -329,23 +329,8 @@ private[spark] class BlockManager( * never deletes (recent) items. */ def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { - - // Reducer may need to read many local shuffle blocks and will wrap them into Iterators - // at the beginning. The wrapping will cost some memory(compression instance - // initialization, etc.). Reducer read shuffle blocks one by one so we could do the - // wrapping lazily to save memory. - class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { - lazy val proxy = f - override def hasNext: Boolean = proxy.hasNext - override def next(): Any = proxy.next() - } - - if (diskStore.contains(blockId)) { - Some(new LazyProxyIterator(diskStore.getValues(blockId, serializer).get)) - } else { - sys.error("Block " + blockId + " not found on disk, though it should be") - None - } + diskStore.getValues(blockId, serializer).orElse( + sys.error("Block " + blockId + " not found on disk, though it should be")) } /** @@ -1030,8 +1015,26 @@ private[spark] class BlockManager( bytes: ByteBuffer, serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() - val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) - serializer.newInstance().deserializeStream(stream).asIterator + + def doWork() = { + val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) + serializer.newInstance().deserializeStream(stream).asIterator + } + + if (blockId.isShuffle) { + // Reducer may need to read many local shuffle blocks and will wrap them into Iterators + // at the beginning. The wrapping will cost some memory(compression instance + // initialization, etc.). Reducer read shuffle blocks one by one so we could do the + // wrapping lazily to save memory. + class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { + lazy val proxy = f + override def hasNext: Boolean = proxy.hasNext + override def next(): Any = proxy.next() + } + new LazyProxyIterator(doWork()) + } else { + doWork() + } } def stop() { From 0924a6b1b65bf2da34361058ee458510e4fb7926 Mon Sep 17 00:00:00 2001 From: "Wenchen Fan(Cloud)" Date: Tue, 3 Jun 2014 14:50:54 +0800 Subject: [PATCH 5/5] rename 'doWork' into 'getIterator' --- .../scala/org/apache/spark/storage/BlockManager.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3d6dbf8641321..95b3791f862b9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1016,14 +1016,14 @@ private[spark] class BlockManager( serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() - def doWork() = { + def getIterator = { val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) serializer.newInstance().deserializeStream(stream).asIterator } if (blockId.isShuffle) { // Reducer may need to read many local shuffle blocks and will wrap them into Iterators - // at the beginning. The wrapping will cost some memory(compression instance + // at the beginning. The wrapping will cost some memory (compression instance // initialization, etc.). Reducer read shuffle blocks one by one so we could do the // wrapping lazily to save memory. class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { @@ -1031,9 +1031,9 @@ private[spark] class BlockManager( override def hasNext: Boolean = proxy.hasNext override def next(): Any = proxy.next() } - new LazyProxyIterator(doWork()) + new LazyProxyIterator(getIterator) } else { - doWork() + getIterator } }