From 7b899c2433634b8472cbce45ecd5cee84cdd4de0 Mon Sep 17 00:00:00 2001 From: wangfei Date: Fri, 9 May 2014 20:10:37 +0800 Subject: [PATCH 01/13] =?UTF-8?q?=E3=80=90SPARK-1779=E3=80=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit add a warning when memoryFraction is not between 0 and 1 --- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6d7d4f922e1fa..92d602289edc2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1045,6 +1045,9 @@ private[spark] object BlockManager extends Logging { def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) + if (memoryFraction > 1 && memoryFraction <= 0) { + logWarning("spark.storage.memoryFraction should be between 0 and 1.") + } (Runtime.getRuntime.maxMemory * memoryFraction).toLong } From a59d76b28f7a646744e04b9f0ccd6ad99ad84203 Mon Sep 17 00:00:00 2001 From: wangfei Date: Sat, 10 May 2014 00:48:36 +0800 Subject: [PATCH 02/13] Throw exception when memoryFracton is out of range --- .../scala/org/apache/spark/storage/BlockManager.scala | 2 +- .../spark/util/collection/ExternalAppendOnlyMap.scala | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 92d602289edc2..7234a9de98b62 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1046,7 +1046,7 @@ private[spark] object BlockManager extends Logging { def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) if (memoryFraction > 1 && memoryFraction <= 0) { - logWarning("spark.storage.memoryFraction should be between 0 and 1.") + throw new Exception("spark.storage.memoryFraction should be between 0 and 1.") } (Runtime.getRuntime.maxMemory * memoryFraction).toLong } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 170f09be21534..a0b7b317d682c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -76,6 +76,16 @@ class ExternalAppendOnlyMap[K, V, C]( private val maxMemoryThreshold = { val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) + + if (memoryFraction > 1 && memoryFraction <= 0) { + throw new Exception("spark.shuffle.memoryFraction should be between 0 and 1.") + } + + if (safetyFraction > 1 && safetyFraction <= 0) { + throw new Exception("spark.shuffle.safetyFraction should be between 0 and 1.") + } + + (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } From 764965f602e2bc931ab7aebaf88bd8d0e425dd7d Mon Sep 17 00:00:00 2001 From: wangfei Date: Sat, 10 May 2014 09:03:46 +0800 Subject: [PATCH 03/13] Update ExternalAppendOnlyMap.scala --- .../spark/util/collection/ExternalAppendOnlyMap.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index a0b7b317d682c..69365a81044b0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -76,16 +76,12 @@ class ExternalAppendOnlyMap[K, V, C]( private val maxMemoryThreshold = { val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) - - if (memoryFraction > 1 && memoryFraction <= 0) { + if (memoryFraction > 1 && memoryFraction < 0) { throw new Exception("spark.shuffle.memoryFraction should be between 0 and 1.") } - - if (safetyFraction > 1 && safetyFraction <= 0) { + if (safetyFraction > 1 && safetyFraction < 0) { throw new Exception("spark.shuffle.safetyFraction should be between 0 and 1.") } - - (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } From dff1f0fd2678876ad3a85bb188957b88ec1ade8e Mon Sep 17 00:00:00 2001 From: wangfei Date: Sat, 10 May 2014 09:03:52 +0800 Subject: [PATCH 04/13] Update BlockManager.scala --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 7234a9de98b62..c4071f561af09 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1045,7 +1045,7 @@ private[spark] object BlockManager extends Logging { def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) - if (memoryFraction > 1 && memoryFraction <= 0) { + if (memoryFraction > 1 && memoryFraction < 0) { throw new Exception("spark.storage.memoryFraction should be between 0 and 1.") } (Runtime.getRuntime.maxMemory * memoryFraction).toLong From 14d18ac8204488fb06a7db9f9a014a59cb3bbc00 Mon Sep 17 00:00:00 2001 From: wangfei Date: Sat, 10 May 2014 11:21:13 +0800 Subject: [PATCH 05/13] throw IllegalArgumentException --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c4071f561af09..0256af8574783 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1046,7 +1046,7 @@ private[spark] object BlockManager extends Logging { def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) if (memoryFraction > 1 && memoryFraction < 0) { - throw new Exception("spark.storage.memoryFraction should be between 0 and 1.") + throw new IllegalArgumentException("spark.storage.memoryFraction should be between 0 and 1.") } (Runtime.getRuntime.maxMemory * memoryFraction).toLong } From cf38bcfd23dab466ce2e160700537e20ae6f24ea Mon Sep 17 00:00:00 2001 From: wangfei Date: Sat, 10 May 2014 11:21:46 +0800 Subject: [PATCH 06/13] throw IllegalArgumentException --- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 69365a81044b0..548aeeba36b4d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -77,10 +77,10 @@ class ExternalAppendOnlyMap[K, V, C]( val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) if (memoryFraction > 1 && memoryFraction < 0) { - throw new Exception("spark.shuffle.memoryFraction should be between 0 and 1.") + throw new IllegalArgumentException("spark.shuffle.memoryFraction should be between 0 and 1.") } if (safetyFraction > 1 && safetyFraction < 0) { - throw new Exception("spark.shuffle.safetyFraction should be between 0 and 1.") + throw new IllegalArgumentException("spark.shuffle.safetyFraction should be between 0 and 1.") } (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } From 43621bdb860ca588ed0ce87798994881ad4998af Mon Sep 17 00:00:00 2001 From: wangfei Date: Sat, 10 May 2014 16:18:34 +0800 Subject: [PATCH 07/13] && => || --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0256af8574783..feb89f4596485 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1045,7 +1045,7 @@ private[spark] object BlockManager extends Logging { def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) - if (memoryFraction > 1 && memoryFraction < 0) { + if (memoryFraction > 1 || memoryFraction < 0) { throw new IllegalArgumentException("spark.storage.memoryFraction should be between 0 and 1.") } (Runtime.getRuntime.maxMemory * memoryFraction).toLong From 2e79b3d72670cc45859629fdd13706d9728bc008 Mon Sep 17 00:00:00 2001 From: wangfei Date: Sat, 10 May 2014 16:19:21 +0800 Subject: [PATCH 08/13] && => || --- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 548aeeba36b4d..375f23bee9f5f 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -76,10 +76,10 @@ class ExternalAppendOnlyMap[K, V, C]( private val maxMemoryThreshold = { val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) - if (memoryFraction > 1 && memoryFraction < 0) { + if (memoryFraction > 1 || memoryFraction < 0) { throw new IllegalArgumentException("spark.shuffle.memoryFraction should be between 0 and 1.") } - if (safetyFraction > 1 && safetyFraction < 0) { + if (safetyFraction > 1 || safetyFraction < 0) { throw new IllegalArgumentException("spark.shuffle.safetyFraction should be between 0 and 1.") } (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong From fc4547623b2f911e486a5986ea3155f0a23e23f1 Mon Sep 17 00:00:00 2001 From: wangfei Date: Sun, 11 May 2014 20:59:10 +0800 Subject: [PATCH 09/13] validate memoryfraction in sparkconf --- .../scala/org/apache/spark/SparkConf.scala | 21 +++++++++++++++++++ .../apache/spark/storage/BlockManager.scala | 3 --- .../collection/ExternalAppendOnlyMap.scala | 6 ------ 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index bd21fdc5a18e4..e36702efd2002 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -236,6 +236,27 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { } } + // Validate memoryFraction + val storageMemFraction = getDouble("spark.storage.memoryFraction", 0.6) + val shuffleMemFraction = getDouble("spark.shuffle.memoryFraction", 0.3) + val shuffleSafFraction = getDouble("spark.shuffle.safetyFraction", 0.8) + + if (storageMemFraction > 1 || storageMemFraction < 0) { + val msg = s"spark.storage.memoryFraction should be between 0 and 1 " + + s"(was '$storageMemFraction')." + throw new IllegalArgumentException(msg) + } + if (shuffleMemFraction > 1 || shuffleMemFraction < 0) { + val msg = s"spark.shuffle.memoryFraction should be between 0 and 1 " + + s"(was '$shuffleMemFraction')." + throw new IllegalArgumentException(msg) + } + if (shuffleSafFraction > 1 || shuffleSafFraction < 0) { + val msg = s"spark.shuffle.safetyFraction should be between 0 and 1 " + + s"(was '$shuffleSafFraction')." + throw new IllegalArgumentException(msg) + } + // Check for legacy configs sys.env.get("SPARK_JAVA_OPTS").foreach { value => val error = diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index feb89f4596485..6d7d4f922e1fa 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1045,9 +1045,6 @@ private[spark] object BlockManager extends Logging { def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) - if (memoryFraction > 1 || memoryFraction < 0) { - throw new IllegalArgumentException("spark.storage.memoryFraction should be between 0 and 1.") - } (Runtime.getRuntime.maxMemory * memoryFraction).toLong } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 375f23bee9f5f..170f09be21534 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -76,12 +76,6 @@ class ExternalAppendOnlyMap[K, V, C]( private val maxMemoryThreshold = { val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) - if (memoryFraction > 1 || memoryFraction < 0) { - throw new IllegalArgumentException("spark.shuffle.memoryFraction should be between 0 and 1.") - } - if (safetyFraction > 1 || safetyFraction < 0) { - throw new IllegalArgumentException("spark.shuffle.safetyFraction should be between 0 and 1.") - } (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } From 717c0ca7a106bd021017f41f2d377e641c15ce04 Mon Sep 17 00:00:00 2001 From: wangfei Date: Mon, 16 Jun 2014 23:25:43 +0800 Subject: [PATCH 10/13] updated to make more concise --- .../scala/org/apache/spark/SparkConf.scala | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index e36702efd2002..8ddb18f9956c3 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -237,24 +237,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { } // Validate memoryFraction - val storageMemFraction = getDouble("spark.storage.memoryFraction", 0.6) - val shuffleMemFraction = getDouble("spark.shuffle.memoryFraction", 0.3) - val shuffleSafFraction = getDouble("spark.shuffle.safetyFraction", 0.8) - - if (storageMemFraction > 1 || storageMemFraction < 0) { - val msg = s"spark.storage.memoryFraction should be between 0 and 1 " + - s"(was '$storageMemFraction')." - throw new IllegalArgumentException(msg) - } - if (shuffleMemFraction > 1 || shuffleMemFraction < 0) { - val msg = s"spark.shuffle.memoryFraction should be between 0 and 1 " + - s"(was '$shuffleMemFraction')." - throw new IllegalArgumentException(msg) - } - if (shuffleSafFraction > 1 || shuffleSafFraction < 0) { - val msg = s"spark.shuffle.safetyFraction should be between 0 and 1 " + - s"(was '$shuffleSafFraction')." - throw new IllegalArgumentException(msg) + val memoryKeys = Seq( + "spark.storage.memoryFraction", + "spark.shuffle.memoryFraction", + "spark.shuffle.safetyFraction") + for (key -> memoryKeys) { + val value = getDouble(key, 0.5) + if (value > 1 || value < 0) { + throw new IllegalArgumentException("$key should be between 0 and (was '$value').") + } } // Check for legacy configs From 829a195bb65a41569561b39c1f3f3eef8d0a7288 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 17 Jun 2014 10:12:17 +0800 Subject: [PATCH 11/13] add indent --- core/src/main/scala/org/apache/spark/SparkConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 8ddb18f9956c3..deed045c2b97c 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -244,7 +244,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { for (key -> memoryKeys) { val value = getDouble(key, 0.5) if (value > 1 || value < 0) { - throw new IllegalArgumentException("$key should be between 0 and (was '$value').") + throw new IllegalArgumentException("$key should be between 0 and (was '$value').") } } From da6ee59f68013ed888520fbd37069d526b74440a Mon Sep 17 00:00:00 2001 From: wangfei Date: Mon, 4 Aug 2014 13:41:14 +0800 Subject: [PATCH 12/13] add configs --- core/src/main/scala/org/apache/spark/SparkConf.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index deed045c2b97c..cbb182202fa6e 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -240,11 +240,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { val memoryKeys = Seq( "spark.storage.memoryFraction", "spark.shuffle.memoryFraction", - "spark.shuffle.safetyFraction") + "spark.shuffle.safetyFraction", + "spark.storage.unrollFraction", + "spark.storage.safetyFraction") for (key -> memoryKeys) { val value = getDouble(key, 0.5) if (value > 1 || value < 0) { - throw new IllegalArgumentException("$key should be between 0 and (was '$value').") + throw new IllegalArgumentException("$key should be between 0 and 1 (was '$value').") } } From 6e385b9c061aff8d540b59bd29ed92eccf2cd335 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 5 Aug 2014 09:35:08 +0800 Subject: [PATCH 13/13] Update SparkConf.scala --- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index cbb182202fa6e..95653ec1038a3 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -236,14 +236,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { } } - // Validate memoryFraction + // Validate memory fractions val memoryKeys = Seq( "spark.storage.memoryFraction", "spark.shuffle.memoryFraction", "spark.shuffle.safetyFraction", "spark.storage.unrollFraction", "spark.storage.safetyFraction") - for (key -> memoryKeys) { + for (key <- memoryKeys) { val value = getDouble(key, 0.5) if (value > 1 || value < 0) { throw new IllegalArgumentException("$key should be between 0 and 1 (was '$value').")