diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 8371deca7311..4bfd2d358f36 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -83,10 +83,10 @@ public void spill() throws IOException { public abstract long spill(long size, MemoryConsumer trigger) throws IOException; /** - * Allocates a LongArray of `size`. Note that this method may throw `OutOfMemoryError` if Spark - * doesn't have enough memory for this allocation, or throw `TooLargePageException` if this - * `LongArray` is too large to fit in a single page. The caller side should take care of these - * two exceptions, or make sure the `size` is small enough that won't trigger exceptions. + * Allocates a LongArray of `size`. Note that this method may throw `SparkOutOfMemoryError` + * if Spark doesn't have enough memory for this allocation, or throw `TooLargePageException` + * if this `LongArray` is too large to fit in a single page. The caller side should take care of + * these two exceptions, or make sure the `size` is small enough that won't trigger exceptions. * * @throws SparkOutOfMemoryError * @throws TooLargePageException @@ -111,7 +111,7 @@ public void freeArray(LongArray array) { /** * Allocate a memory block with at least `required` bytes. * - * @throws OutOfMemoryError + * @throws SparkOutOfMemoryError */ protected MemoryBlock allocatePage(long required) { MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this); diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 9b6cbab38cbc..a4e88598f760 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -31,6 +31,7 @@ import org.apache.spark.SparkEnv; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.BlockManager; @@ -741,7 +742,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) { try { growAndRehash(); - } catch (OutOfMemoryError oom) { + } catch (SparkOutOfMemoryError oom) { canGrowArray = false; } } @@ -757,7 +758,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff private boolean acquireNewPage(long required) { try { currentPage = allocatePage(required); - } catch (OutOfMemoryError e) { + } catch (SparkOutOfMemoryError e) { return false; } dataPages.add(currentPage); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index 411cd5cb5733..d1b29d90ad91 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -38,6 +38,7 @@ import org.apache.spark.executor.TaskMetrics; import org.apache.spark.internal.config.package$; import org.apache.spark.memory.TestMemoryManager; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.JavaSerializer; import org.apache.spark.serializer.SerializerInstance; @@ -534,10 +535,10 @@ public void testOOMDuringSpill() throws Exception { insertNumber(sorter, 1024); fail("expected OutOfMmoryError but it seems operation surprisingly succeeded"); } - // we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure) - catch (OutOfMemoryError oom){ + // we expect an SparkOutOfMemoryError here, anything else (i.e the original NPE is a failure) + catch (SparkOutOfMemoryError oom){ String oomStackTrace = Utils.exceptionString(oom); - assertThat("expected OutOfMemoryError in " + + assertThat("expected SparkOutOfMemoryError in " + "org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset", oomStackTrace, Matchers.containsString( diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index 85ffdca436e1..b0d485f0c953 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -27,6 +27,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.memory.TestMemoryConsumer; import org.apache.spark.memory.TestMemoryManager; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.memory.MemoryBlock; @@ -178,8 +179,8 @@ public int compare( testMemoryManager.markExecutionAsOutOfMemoryOnce(); try { sorter.reset(); - fail("expected OutOfMmoryError but it seems operation surprisingly succeeded"); - } catch (OutOfMemoryError oom) { + fail("expected SparkOutOfMemoryError but it seems operation surprisingly succeeded"); + } catch (SparkOutOfMemoryError oom) { // as expected } // [SPARK-21907] this failed on NPE at diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java index 460513816dfd..6344cf18c11b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.memory.MemoryBlock; @@ -126,7 +127,7 @@ public final void close() { private boolean acquirePage(long requiredSize) { try { page = allocatePage(requiredSize); - } catch (OutOfMemoryError e) { + } catch (SparkOutOfMemoryError e) { logger.warn("Failed to allocate page ({} bytes).", requiredSize); return false; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala index d2820ff335ec..eb12641f548a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala @@ -23,7 +23,7 @@ import com.google.common.io.Closeables import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.io.NioBufferedFileInputStream -import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.memory.{MemoryConsumer, SparkOutOfMemoryError, TaskMemoryManager} import org.apache.spark.serializer.SerializerManager import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.unsafe.Platform @@ -226,7 +226,7 @@ private[python] case class HybridRowQueue( val page = try { allocatePage(required) } catch { - case _: OutOfMemoryError => + case _: SparkOutOfMemoryError => null } val buffer = if (page != null) {