From d20fa4312b811d10ba993dcbb7a9bebc24d5a56c Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Sun, 3 Sep 2017 14:43:57 +0800 Subject: [PATCH 1/8] avoid call reserveUnrollMemoryForThisTask every record --- .../apache/spark/storage/memory/MemoryStore.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 90e3af2d0ec7..ea4389188770 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -329,6 +329,8 @@ private[spark] class MemoryStore( val initialMemoryThreshold = unrollMemoryThreshold // Keep track of unroll memory used by this particular block / putIterator() operation var unrollMemoryUsedByThisBlock = 0L + // Memory to request as a multiple of current reserved size + val memoryGrowthFactor = 1.5 // Underlying buffer for unrolling the block val redirectableStream = new RedirectableOutputStream val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { @@ -359,7 +361,7 @@ private[spark] class MemoryStore( def reserveAdditionalMemoryIfNecessary(): Unit = { if (bbos.size > unrollMemoryUsedByThisBlock) { - val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock + val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest @@ -375,10 +377,16 @@ private[spark] class MemoryStore( // Make sure that we have enough memory to store the block. By this point, it is possible that // the block's actual memory usage has exceeded the unroll memory by a small amount, so we - // perform one final call to attempt to allocate additional memory if necessary. + // perform one final call to attempt to allocate additional memory if necessary. If the task + // reserved more memory than it needed, then release the extra memory that will not be used. if (keepUnrolling) { serializationStream.close() - reserveAdditionalMemoryIfNecessary() + if (bbos.size < unrollMemoryUsedByThisBlock) { + val excessUnrollMemory = unrollMemoryUsedByThisBlock - bbos.size + releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) + } else { + reserveAdditionalMemoryIfNecessary() + } } if (keepUnrolling) { From 9b0eed976b7b32f65e9403daeae009e52cf47a38 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 6 Sep 2017 09:03:25 +0800 Subject: [PATCH 2/8] small fix --- .../org/apache/spark/storage/memory/MemoryStore.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index ea4389188770..b7c8ad5b13fe 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -329,8 +329,6 @@ private[spark] class MemoryStore( val initialMemoryThreshold = unrollMemoryThreshold // Keep track of unroll memory used by this particular block / putIterator() operation var unrollMemoryUsedByThisBlock = 0L - // Memory to request as a multiple of current reserved size - val memoryGrowthFactor = 1.5 // Underlying buffer for unrolling the block val redirectableStream = new RedirectableOutputStream val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { @@ -349,6 +347,9 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } + // Since each time the size of 'bbos' growth is a 'chunkSize', so every time we apply for + // additional 'chunkSize' memory to avoid unnecessary synchronization. + var additionalMemoryRequest = chunkSize // Request enough memory to begin unrolling keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) @@ -361,7 +362,7 @@ private[spark] class MemoryStore( def reserveAdditionalMemoryIfNecessary(): Unit = { if (bbos.size > unrollMemoryUsedByThisBlock) { - val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong + val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock + additionalMemoryRequest keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest @@ -385,6 +386,7 @@ private[spark] class MemoryStore( val excessUnrollMemory = unrollMemoryUsedByThisBlock - bbos.size releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) } else { + additionalMemoryRequest = 0 reserveAdditionalMemoryIfNecessary() } } From 8c25b2b3fc2c2f76f258201db1e03d56a3bc0e94 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 6 Sep 2017 09:04:33 +0800 Subject: [PATCH 3/8] small fix --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index b7c8ad5b13fe..1cf4df276ad1 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -362,7 +362,7 @@ private[spark] class MemoryStore( def reserveAdditionalMemoryIfNecessary(): Unit = { if (bbos.size > unrollMemoryUsedByThisBlock) { - val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock + additionalMemoryRequest + val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock + additionalMemoryRequest keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest From 4b1f9dd3b15caf9a867ae3d4b62b92a24c127893 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Mon, 11 Sep 2017 15:50:12 +0800 Subject: [PATCH 4/8] change reserve more memory to check every batch records --- .../spark/storage/memory/MemoryStore.scala | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 1cf4df276ad1..eb5f9b0a15e7 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -325,6 +325,10 @@ private[spark] class MemoryStore( // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true + // Number of elements unrolled so far + var elementsUnrolled = 0L + // How often to check whether we need to request more memory + val memoryCheckPeriod = 16 // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // Keep track of unroll memory used by this particular block / putIterator() operation @@ -347,9 +351,6 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } - // Since each time the size of 'bbos' growth is a 'chunkSize', so every time we apply for - // additional 'chunkSize' memory to avoid unnecessary synchronization. - var additionalMemoryRequest = chunkSize // Request enough memory to begin unrolling keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) @@ -362,7 +363,7 @@ private[spark] class MemoryStore( def reserveAdditionalMemoryIfNecessary(): Unit = { if (bbos.size > unrollMemoryUsedByThisBlock) { - val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock + additionalMemoryRequest + val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest @@ -373,22 +374,18 @@ private[spark] class MemoryStore( // Unroll this block safely, checking whether we have exceeded our threshold while (values.hasNext && keepUnrolling) { serializationStream.writeObject(values.next())(classTag) - reserveAdditionalMemoryIfNecessary() + elementsUnrolled += 1 + if (elementsUnrolled % memoryCheckPeriod == 0) { + reserveAdditionalMemoryIfNecessary() + } } // Make sure that we have enough memory to store the block. By this point, it is possible that // the block's actual memory usage has exceeded the unroll memory by a small amount, so we - // perform one final call to attempt to allocate additional memory if necessary. If the task - // reserved more memory than it needed, then release the extra memory that will not be used. + // perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { serializationStream.close() - if (bbos.size < unrollMemoryUsedByThisBlock) { - val excessUnrollMemory = unrollMemoryUsedByThisBlock - bbos.size - releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) - } else { - additionalMemoryRequest = 0 - reserveAdditionalMemoryIfNecessary() - } + reserveAdditionalMemoryIfNecessary } if (keepUnrolling) { From 8059aff4e9e768081a6946378c57cfe9b498fbb5 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Mon, 11 Sep 2017 15:51:17 +0800 Subject: [PATCH 5/8] small fix --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index eb5f9b0a15e7..bc44d5866610 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -385,7 +385,7 @@ private[spark] class MemoryStore( // perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { serializationStream.close() - reserveAdditionalMemoryIfNecessary + reserveAdditionalMemoryIfNecessary() } if (keepUnrolling) { From e7ebbd2ce4e4476527b3fcc8a6ed38de15e64d99 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 15 Sep 2017 09:42:43 +0800 Subject: [PATCH 6/8] make parameter configurable and request memory with factor --- .../org/apache/spark/storage/memory/MemoryStore.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index bc44d5866610..35d2a79dc568 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -190,11 +190,11 @@ private[spark] class MemoryStore( // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory - val memoryCheckPeriod = 16 + val memoryCheckPeriod = conf.getLong("spark.storage.unrollMemoryCheckPeriod", 16) // Memory currently reserved by this task for this particular unrolling operation var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size - val memoryGrowthFactor = 1.5 + val memoryGrowthFactor = conf.getDouble("spark.storage.unrollMemoryGrowthFactor", 1.5) // Keep track of unroll memory used by this particular block / putIterator() operation var unrollMemoryUsedByThisBlock = 0L // Underlying vector for unrolling the block @@ -328,7 +328,9 @@ private[spark] class MemoryStore( // Number of elements unrolled so far var elementsUnrolled = 0L // How often to check whether we need to request more memory - val memoryCheckPeriod = 16 + val memoryCheckPeriod = conf.getLong("spark.storage.unrollMemoryCheckPeriod", 16) + // Memory to request as a multiple of current bbos size + val memoryGrowthFactor = conf.getDouble("spark.storage.unrollMemoryGrowthFactor", 1.5) // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // Keep track of unroll memory used by this particular block / putIterator() operation @@ -363,7 +365,7 @@ private[spark] class MemoryStore( def reserveAdditionalMemoryIfNecessary(): Unit = { if (bbos.size > unrollMemoryUsedByThisBlock) { - val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock + val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest From 7f97cc2e39eb106ee405a460e0db5e6a62991b4d Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Sun, 17 Sep 2017 20:37:38 +0800 Subject: [PATCH 7/8] address comments --- .../org/apache/spark/internal/config/package.scala | 13 +++++++++++++ .../apache/spark/storage/memory/MemoryStore.scala | 9 +++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0d3769a73586..2c063a7ea9a1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -385,4 +385,17 @@ package object config { .checkValue(v => v > 0 && v <= Int.MaxValue, s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.") .createWithDefault(1024 * 1024) + + private[spark] val UNROLL_MEMORY_CHECK_PERIOD = + ConfigBuilder("spark.storage.unrollMemoryCheckPeriod") + .doc("The memory check period is used to determine how often we should check whether " + + "there is a need to request more memory when we try to put the given block in memory.") + .longConf + .createWithDefault(16) + + private[spark] val UNROLL_MEMORY_GROWTH_FACTOR = + ConfigBuilder("spark.storage.unrollMemoryGrowthFactor") + .doc("Memory to request as a multiple of the size that used to unroll the block.") + .doubleConf + .createWithDefault(1.5) } diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 35d2a79dc568..eb2201d142ff 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -29,6 +29,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId} @@ -190,11 +191,11 @@ private[spark] class MemoryStore( // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory - val memoryCheckPeriod = conf.getLong("spark.storage.unrollMemoryCheckPeriod", 16) + val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD) // Memory currently reserved by this task for this particular unrolling operation var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size - val memoryGrowthFactor = conf.getDouble("spark.storage.unrollMemoryGrowthFactor", 1.5) + val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR) // Keep track of unroll memory used by this particular block / putIterator() operation var unrollMemoryUsedByThisBlock = 0L // Underlying vector for unrolling the block @@ -328,9 +329,9 @@ private[spark] class MemoryStore( // Number of elements unrolled so far var elementsUnrolled = 0L // How often to check whether we need to request more memory - val memoryCheckPeriod = conf.getLong("spark.storage.unrollMemoryCheckPeriod", 16) + val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD) // Memory to request as a multiple of current bbos size - val memoryGrowthFactor = conf.getDouble("spark.storage.unrollMemoryGrowthFactor", 1.5) + val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR) // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // Keep track of unroll memory used by this particular block / putIterator() operation From e1dc7a46afb99fba9b489e6a5359bc347799e876 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Mon, 18 Sep 2017 18:09:35 +0800 Subject: [PATCH 8/8] address comments --- .../main/scala/org/apache/spark/internal/config/package.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2c063a7ea9a1..e0f696080e56 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -388,13 +388,15 @@ package object config { private[spark] val UNROLL_MEMORY_CHECK_PERIOD = ConfigBuilder("spark.storage.unrollMemoryCheckPeriod") + .internal() .doc("The memory check period is used to determine how often we should check whether " - + "there is a need to request more memory when we try to put the given block in memory.") + + "there is a need to request more memory when we try to unroll the given block in memory.") .longConf .createWithDefault(16) private[spark] val UNROLL_MEMORY_GROWTH_FACTOR = ConfigBuilder("spark.storage.unrollMemoryGrowthFactor") + .internal() .doc("Memory to request as a multiple of the size that used to unroll the block.") .doubleConf .createWithDefault(1.5)