From 30744be4f9318b8534b605502fc6a15cfb76b84e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 9 Dec 2015 17:59:41 -0800 Subject: [PATCH 1/5] Deprecate spark.unsafe.offHeap in favor of spark.memory.useOffHeap. --- .../main/scala/org/apache/spark/SparkConf.scala | 4 +++- .../org/apache/spark/memory/MemoryManager.scala | 6 +++++- .../spark/memory/TaskMemoryManagerSuite.java | 15 ++++++++++++--- .../shuffle/sort/PackedRecordPointerSuite.java | 4 ++-- .../shuffle/sort/ShuffleInMemorySorterSuite.java | 4 ++-- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +- .../unsafe/map/AbstractBytesToBytesMapSuite.java | 2 +- .../unsafe/sort/UnsafeExternalSorterSuite.java | 2 +- .../unsafe/sort/UnsafeInMemorySorterSuite.java | 4 ++-- docs/configuration.md | 16 ++++++++++++++++ .../sql/execution/joins/HashedRelation.scala | 6 +++++- .../UnsafeFixedWidthAggregationMapSuite.scala | 2 +- .../execution/UnsafeKVExternalSorterSuite.scala | 2 +- 13 files changed, 52 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 19633a3ce6a02..ed5430ac53d7a 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging { "spark.streaming.fileStream.minRememberDuration" -> Seq( AlternateConfig("spark.streaming.minRememberDuration", "1.5")), "spark.yarn.max.executor.failures" -> Seq( - AlternateConfig("spark.yarn.max.worker.failures", "1.5")) + AlternateConfig("spark.yarn.max.worker.failures", "1.5")), + "spark.unsafe.offHeap" -> Seq( + AlternateConfig("spark.memory.useOffHeap", "1.6")) ) /** diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index ae9e1ac0e246b..beae156fde5a7 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -182,7 +182,11 @@ private[spark] abstract class MemoryManager( * sun.misc.Unsafe. */ final val tungstenMemoryMode: MemoryMode = { - if (conf.getBoolean("spark.unsafe.offHeap", false)) MemoryMode.OFF_HEAP else MemoryMode.ON_HEAP + if (conf.getBoolean("spark.memory.useOffHeap", false)) { + MemoryMode.OFF_HEAP + } else { + MemoryMode.ON_HEAP + } } /** diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index 711eed0193bc0..7fb007e879a58 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -29,7 +29,7 @@ public class TaskMemoryManagerSuite { public void leakedPageMemoryIsDetected() { final TaskMemoryManager manager = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.unsafe.offHeap", "false"), + new SparkConf().set("spark.memory.useOffHeap", "false"), Long.MAX_VALUE, Long.MAX_VALUE, 1), @@ -42,7 +42,7 @@ public void leakedPageMemoryIsDetected() { @Test public void encodePageNumberAndOffsetOffHeap() { final TaskMemoryManager manager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "true")), 0); + new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "true")), 0); final MemoryBlock dataPage = manager.allocatePage(256, null); // In off-heap mode, an offset is an absolute address that may require more than 51 bits to // encode. This test exercises that corner-case: @@ -55,7 +55,7 @@ public void encodePageNumberAndOffsetOffHeap() { @Test public void encodePageNumberAndOffsetOnHeap() { final TaskMemoryManager manager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0); + new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")), 0); final MemoryBlock dataPage = manager.allocatePage(256, null); final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64); Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress)); @@ -104,4 +104,13 @@ public void cooperativeSpilling() { assert(manager.cleanUpAllAllocatedMemory() == 0); } + @Test + public void offHeapConfigurationBackwardsCompatibility() { + // Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which + // was deprecated in Spark 1.6 and replaced by `spark.memory.useOffHeap` (see SPARK-12251). + final TaskMemoryManager manager = new TaskMemoryManager( + new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "true")), 0); + assert(manager.tungstenMemoryMode == MemoryMode.OFF_HEAP); + } + } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java index 9a43f1f3a9235..f1905448f5d22 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java @@ -35,7 +35,7 @@ public class PackedRecordPointerSuite { @Test public void heap() throws IOException { - final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false"); + final SparkConf conf = new SparkConf().set("spark.memory.useOffHeap", "false"); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryBlock page0 = memoryManager.allocatePage(128, null); @@ -54,7 +54,7 @@ public void heap() throws IOException { @Test public void offHeap() throws IOException { - final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "true"); + final SparkConf conf = new SparkConf().set("spark.memory.useOffHeap", "true"); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryBlock page0 = memoryManager.allocatePage(128, null); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java index faa5a863ee630..23c4d0732b7ba 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java @@ -34,7 +34,7 @@ public class ShuffleInMemorySorterSuite { final TestMemoryManager memoryManager = - new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")); + new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")); final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0); final TestMemoryConsumer consumer = new TestMemoryConsumer(taskMemoryManager); @@ -64,7 +64,7 @@ public void testBasicSorting() throws Exception { "Lychee", "Mango" }; - final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false"); + final SparkConf conf = new SparkConf().set("spark.memory.useOffHeap", "false"); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryBlock dataPage = memoryManager.allocatePage(2048, null); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index bc85918c59aab..cc87dd2a9d952 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -108,7 +108,7 @@ public void setUp() throws IOException { spillFilesCreated.clear(); conf = new SparkConf() .set("spark.buffer.pageSize", "1m") - .set("spark.unsafe.offHeap", "false"); + .set("spark.memory.useOffHeap", "false"); taskMetrics = new TaskMetrics(); memoryManager = new TestMemoryManager(conf); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 8724a34988421..812b715cf1473 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -85,7 +85,7 @@ public void setup() { memoryManager = new TestMemoryManager( new SparkConf() - .set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator()) + .set("spark.memory.useOffHeap", "" + useOffHeapMemoryAllocator()) .set("spark.memory.offHeapSize", "256mb")); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index a1c9f6fab8e65..cd5fd9f189475 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -58,7 +58,7 @@ public class UnsafeExternalSorterSuite { final LinkedList spillFilesCreated = new LinkedList(); final TestMemoryManager memoryManager = - new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")); + new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")); final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0); // Use integer comparison for comparing prefixes (which are partition ids, in this case) final PrefixComparator prefixComparator = new PrefixComparator() { diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index a203a09648ac0..a9fb10d4c5a9c 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -46,7 +46,7 @@ private static String getStringFromDataPage(Object baseObject, long baseOffset, @Test public void testSortingEmptyInput() { final TaskMemoryManager memoryManager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0); + new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")), 0); final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); final UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, @@ -71,7 +71,7 @@ public void testSortingOnlyByIntegerPrefix() throws Exception { "Mango" }; final TaskMemoryManager memoryManager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0); + new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")), 0); final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); final MemoryBlock dataPage = memoryManager.allocatePage(2048, null); final Object baseObject = dataPage.getBaseObject(); diff --git a/docs/configuration.md b/docs/configuration.md index fd61ddc244f44..ede159a83d941 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -738,6 +738,22 @@ Apart from these, the following properties are also available, and may be useful this description. + + spark.memory.useOffHeap + true + + If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory use is enabled, then spark.memory.offHeapSize must be positive. + + + + spark.memory.offHeapSize + 0 + + The absolute amount of memory which can be used for off-heap allocation. + This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. + This must be set to a positive value when spark.memory.useOffHeap=true. + + spark.memory.useLegacyMode false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index aebfea5832402..51eede3cfe960 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -334,7 +334,11 @@ private[joins] final class UnsafeHashedRelation( // so that tests compile: val taskMemoryManager = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.unsafe.offHeap", "false"), Long.MaxValue, Long.MaxValue, 1), 0) + new SparkConf().set("spark.memory.useOffHeap", "false"), + Long.MaxValue, + Long.MaxValue, + 1), + 0) val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes) .getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala index 7ceaee38d131b..7d6da05450744 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -61,7 +61,7 @@ class UnsafeFixedWidthAggregationMapSuite } test(name) { - val conf = new SparkConf().set("spark.unsafe.offHeap", "false") + val conf = new SparkConf().set("spark.memory.useOffHeap", "false") memoryManager = new TestMemoryManager(conf) taskMemoryManager = new TaskMemoryManager(memoryManager, 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 7b80963ec8708..2572b9785df20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -109,7 +109,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { pageSize: Long, spill: Boolean): Unit = { val memoryManager = - new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")) + new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")) val taskMemMgr = new TaskMemoryManager(memoryManager, 0) TaskContext.setTaskContext(new TaskContextImpl( stageId = 0, From f69f4383c66ff1b7bb8664fe2bdffba46254db1c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 9 Dec 2015 18:15:30 -0800 Subject: [PATCH 2/5] Add test to check that offHeap memory size is configured when off heap is used --- .../org/apache/spark/memory/MemoryManager.scala | 2 ++ .../apache/spark/memory/TaskMemoryManagerSuite.java | 12 ++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index beae156fde5a7..67572c36c0478 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -183,6 +183,8 @@ private[spark] abstract class MemoryManager( */ final val tungstenMemoryMode: MemoryMode = { if (conf.getBoolean("spark.memory.useOffHeap", false)) { + require(conf.getSizeAsBytes("spark.memory.offHeapSize", 0) > 0, + "spark.memory.offHeapSize must be > 0 when spark.memory.useOffHeap == true") MemoryMode.OFF_HEAP } else { MemoryMode.ON_HEAP diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index 7fb007e879a58..17484bf9774f3 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -41,8 +41,10 @@ public void leakedPageMemoryIsDetected() { @Test public void encodePageNumberAndOffsetOffHeap() { - final TaskMemoryManager manager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "true")), 0); + final SparkConf conf = new SparkConf() + .set("spark.memory.useOffHeap", "true") + .set("spark.memory.offHeapSize", "1000"); + final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryBlock dataPage = manager.allocatePage(256, null); // In off-heap mode, an offset is an absolute address that may require more than 51 bits to // encode. This test exercises that corner-case: @@ -108,8 +110,10 @@ public void cooperativeSpilling() { public void offHeapConfigurationBackwardsCompatibility() { // Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which // was deprecated in Spark 1.6 and replaced by `spark.memory.useOffHeap` (see SPARK-12251). - final TaskMemoryManager manager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "true")), 0); + final SparkConf conf = new SparkConf() + .set("spark.unsafe.offHeap", "true") + .set("spark.memory.offHeapSize", "1000"); + final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); assert(manager.tungstenMemoryMode == MemoryMode.OFF_HEAP); } From 2dd2d9e289e5259393fa5ad9d662aa20aaf5a967 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 9 Dec 2015 18:20:49 -0800 Subject: [PATCH 3/5] Fix mis-ordered parameters --- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index ed5430ac53d7a..24f094943bc12 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -598,8 +598,8 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.streaming.minRememberDuration", "1.5")), "spark.yarn.max.executor.failures" -> Seq( AlternateConfig("spark.yarn.max.worker.failures", "1.5")), - "spark.unsafe.offHeap" -> Seq( - AlternateConfig("spark.memory.useOffHeap", "1.6")) + "spark.memory.useOffHeap" -> Seq( + AlternateConfig("spark.unsafe.offHeap", "1.6")) ) /** From 216fc466377afe39f2cf71beb68f2f8fd6b6b753 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 9 Dec 2015 19:49:14 -0800 Subject: [PATCH 4/5] More conf. renaming. --- core/src/main/scala/org/apache/spark/SparkConf.scala | 2 +- .../org/apache/spark/memory/MemoryManager.scala | 8 ++++---- .../apache/spark/memory/TaskMemoryManagerSuite.java | 12 ++++++------ .../spark/shuffle/sort/PackedRecordPointerSuite.java | 4 ++-- .../shuffle/sort/ShuffleInMemorySorterSuite.java | 4 ++-- .../spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +- .../unsafe/map/AbstractBytesToBytesMapSuite.java | 4 ++-- .../unsafe/sort/UnsafeExternalSorterSuite.java | 2 +- .../unsafe/sort/UnsafeInMemorySorterSuite.java | 4 ++-- .../spark/memory/StaticMemoryManagerSuite.scala | 2 +- .../spark/memory/UnifiedMemoryManagerSuite.scala | 2 +- docs/configuration.md | 8 ++++---- .../spark/sql/execution/joins/HashedRelation.scala | 2 +- .../UnsafeFixedWidthAggregationMapSuite.scala | 2 +- .../sql/execution/UnsafeKVExternalSorterSuite.scala | 2 +- 15 files changed, 30 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 24f094943bc12..d3384fb297732 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -598,7 +598,7 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.streaming.minRememberDuration", "1.5")), "spark.yarn.max.executor.failures" -> Seq( AlternateConfig("spark.yarn.max.worker.failures", "1.5")), - "spark.memory.useOffHeap" -> Seq( + "spark.memory.offHeap.enabled" -> Seq( AlternateConfig("spark.unsafe.offHeap", "1.6")) ) diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 67572c36c0478..e707e27d96b50 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -50,7 +50,7 @@ private[spark] abstract class MemoryManager( storageMemoryPool.incrementPoolSize(storageMemory) onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) - offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize", 0)) + offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0)) /** * Total available memory for storage, in bytes. This amount can vary over time, depending on @@ -182,9 +182,9 @@ private[spark] abstract class MemoryManager( * sun.misc.Unsafe. */ final val tungstenMemoryMode: MemoryMode = { - if (conf.getBoolean("spark.memory.useOffHeap", false)) { - require(conf.getSizeAsBytes("spark.memory.offHeapSize", 0) > 0, - "spark.memory.offHeapSize must be > 0 when spark.memory.useOffHeap == true") + if (conf.getBoolean("spark.memory.offHeap.enabled", false)) { + require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0, + "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true") MemoryMode.OFF_HEAP } else { MemoryMode.ON_HEAP diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index 17484bf9774f3..776a2997cf91f 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -29,7 +29,7 @@ public class TaskMemoryManagerSuite { public void leakedPageMemoryIsDetected() { final TaskMemoryManager manager = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.useOffHeap", "false"), + new SparkConf().set("spark.memory.offHeap.enabled", "false"), Long.MAX_VALUE, Long.MAX_VALUE, 1), @@ -42,8 +42,8 @@ public void leakedPageMemoryIsDetected() { @Test public void encodePageNumberAndOffsetOffHeap() { final SparkConf conf = new SparkConf() - .set("spark.memory.useOffHeap", "true") - .set("spark.memory.offHeapSize", "1000"); + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1000"); final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryBlock dataPage = manager.allocatePage(256, null); // In off-heap mode, an offset is an absolute address that may require more than 51 bits to @@ -57,7 +57,7 @@ public void encodePageNumberAndOffsetOffHeap() { @Test public void encodePageNumberAndOffsetOnHeap() { final TaskMemoryManager manager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")), 0); + new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); final MemoryBlock dataPage = manager.allocatePage(256, null); final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64); Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress)); @@ -109,10 +109,10 @@ public void cooperativeSpilling() { @Test public void offHeapConfigurationBackwardsCompatibility() { // Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which - // was deprecated in Spark 1.6 and replaced by `spark.memory.useOffHeap` (see SPARK-12251). + // was deprecated in Spark 1.6 and replaced by `spark.memory.offHeap.enabled` (see SPARK-12251). final SparkConf conf = new SparkConf() .set("spark.unsafe.offHeap", "true") - .set("spark.memory.offHeapSize", "1000"); + .set("spark.memory.offHeap.size", "1000"); final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); assert(manager.tungstenMemoryMode == MemoryMode.OFF_HEAP); } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java index f1905448f5d22..9f9e322bfdf46 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java @@ -35,7 +35,7 @@ public class PackedRecordPointerSuite { @Test public void heap() throws IOException { - final SparkConf conf = new SparkConf().set("spark.memory.useOffHeap", "false"); + final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false"); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryBlock page0 = memoryManager.allocatePage(128, null); @@ -54,7 +54,7 @@ public void heap() throws IOException { @Test public void offHeap() throws IOException { - final SparkConf conf = new SparkConf().set("spark.memory.useOffHeap", "true"); + final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "true"); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryBlock page0 = memoryManager.allocatePage(128, null); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java index 23c4d0732b7ba..0328e63e45439 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java @@ -34,7 +34,7 @@ public class ShuffleInMemorySorterSuite { final TestMemoryManager memoryManager = - new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")); + new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0); final TestMemoryConsumer consumer = new TestMemoryConsumer(taskMemoryManager); @@ -64,7 +64,7 @@ public void testBasicSorting() throws Exception { "Lychee", "Mango" }; - final SparkConf conf = new SparkConf().set("spark.memory.useOffHeap", "false"); + final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false"); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryBlock dataPage = memoryManager.allocatePage(2048, null); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index cc87dd2a9d952..5fe64bde3604a 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -108,7 +108,7 @@ public void setUp() throws IOException { spillFilesCreated.clear(); conf = new SparkConf() .set("spark.buffer.pageSize", "1m") - .set("spark.memory.useOffHeap", "false"); + .set("spark.memory.offHeap.enabled", "false"); taskMetrics = new TaskMetrics(); memoryManager = new TestMemoryManager(conf); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 812b715cf1473..702ba5469b8b4 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -85,8 +85,8 @@ public void setup() { memoryManager = new TestMemoryManager( new SparkConf() - .set("spark.memory.useOffHeap", "" + useOffHeapMemoryAllocator()) - .set("spark.memory.offHeapSize", "256mb")); + .set("spark.memory.offHeap.enabled", "" + useOffHeapMemoryAllocator()) + .set("spark.memory.offHeap.size", "256mb")); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test"); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index cd5fd9f189475..e0ee281e98b71 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -58,7 +58,7 @@ public class UnsafeExternalSorterSuite { final LinkedList spillFilesCreated = new LinkedList(); final TestMemoryManager memoryManager = - new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")); + new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0); // Use integer comparison for comparing prefixes (which are partition ids, in this case) final PrefixComparator prefixComparator = new PrefixComparator() { diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index a9fb10d4c5a9c..93efd033eb940 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -46,7 +46,7 @@ private static String getStringFromDataPage(Object baseObject, long baseOffset, @Test public void testSortingEmptyInput() { final TaskMemoryManager memoryManager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")), 0); + new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); final UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, @@ -71,7 +71,7 @@ public void testSortingOnlyByIntegerPrefix() throws Exception { "Mango" }; final TaskMemoryManager memoryManager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")), 0); + new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); final MemoryBlock dataPage = memoryManager.allocatePage(2048, null); final Object baseObject = dataPage.getBaseObject(); diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 272253bc94e91..68cf26fc3ed5d 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -47,7 +47,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { conf.clone .set("spark.memory.fraction", "1") .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) - .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString), + .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString), maxOnHeapExecutionMemory = maxOnHeapExecutionMemory, maxStorageMemory = 0, numCores = 1) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 71221deeb4c28..e21a028b7faec 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -42,7 +42,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val conf = new SparkConf() .set("spark.memory.fraction", "1") .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) - .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString) + .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString) .set("spark.memory.storageFraction", storageFraction.toString) UnifiedMemoryManager(conf, numCores = 1) } diff --git a/docs/configuration.md b/docs/configuration.md index ede159a83d941..5629c4999a6cc 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -739,19 +739,19 @@ Apart from these, the following properties are also available, and may be useful - spark.memory.useOffHeap + spark.memory.offHeap.enabled true - If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory use is enabled, then spark.memory.offHeapSize must be positive. + If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive. - spark.memory.offHeapSize + spark.memory.offHeap.size 0 The absolute amount of memory which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. - This must be set to a positive value when spark.memory.useOffHeap=true. + This must be set to a positive value when spark.memory.offHeap.enabled=true. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 51eede3cfe960..8c7099ab5a34d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -334,7 +334,7 @@ private[joins] final class UnsafeHashedRelation( // so that tests compile: val taskMemoryManager = new TaskMemoryManager( new StaticMemoryManager( - new SparkConf().set("spark.memory.useOffHeap", "false"), + new SparkConf().set("spark.memory.offHeap.enabled", "false"), Long.MaxValue, Long.MaxValue, 1), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala index 7d6da05450744..5a8406789ab81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -61,7 +61,7 @@ class UnsafeFixedWidthAggregationMapSuite } test(name) { - val conf = new SparkConf().set("spark.memory.useOffHeap", "false") + val conf = new SparkConf().set("spark.memory.offHeap.enabled", "false") memoryManager = new TestMemoryManager(conf) taskMemoryManager = new TaskMemoryManager(memoryManager, 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 2572b9785df20..29027a664b4b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -109,7 +109,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { pageSize: Long, spill: Boolean): Unit = { val memoryManager = - new TestMemoryManager(new SparkConf().set("spark.memory.useOffHeap", "false")) + new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")) val taskMemMgr = new TaskMemoryManager(memoryManager, 0) TaskContext.setTaskContext(new TaskContextImpl( stageId = 0, From 12ffce3c472ed8118db891b98183a8230bf2eaa0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 10 Dec 2015 12:22:24 -0800 Subject: [PATCH 5/5] Fixed PackedRecordPointerSuite test. --- .../apache/spark/shuffle/sort/PackedRecordPointerSuite.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java index 9f9e322bfdf46..fe5abc5c23049 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java @@ -54,7 +54,9 @@ public void heap() throws IOException { @Test public void offHeap() throws IOException { - final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "true"); + final SparkConf conf = new SparkConf() + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "10000"); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryBlock page0 = memoryManager.allocatePage(128, null);