Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public void spill() throws IOException {
public abstract long spill(long size, MemoryConsumer trigger) throws IOException;

/**
* Allocates a LongArray of `size`. Note that this method may throw `OutOfMemoryError` if Spark
* doesn't have enough memory for this allocation, or throw `TooLargePageException` if this
* `LongArray` is too large to fit in a single page. The caller side should take care of these
* two exceptions, or make sure the `size` is small enough that won't trigger exceptions.
* Allocates a LongArray of `size`. Note that this method may throw `SparkOutOfMemoryError`
* if Spark doesn't have enough memory for this allocation, or throw `TooLargePageException`
* if this `LongArray` is too large to fit in a single page. The caller side should take care of
* these two exceptions, or make sure the `size` is small enough that won't trigger exceptions.
*
* @throws SparkOutOfMemoryError
* @throws TooLargePageException
Expand All @@ -111,7 +111,7 @@ public void freeArray(LongArray array) {
/**
* Allocate a memory block with at least `required` bytes.
*
* @throws OutOfMemoryError
* @throws SparkOutOfMemoryError
*/
protected MemoryBlock allocatePage(long required) {
MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.SparkEnv;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockManager;
Expand Down Expand Up @@ -741,7 +742,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
try {
growAndRehash();
} catch (OutOfMemoryError oom) {
} catch (SparkOutOfMemoryError oom) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know what was the behavior before? Will we propagate the exception all the way up and kill the executor?

canGrowArray = false;
}
}
Expand All @@ -757,7 +758,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
private boolean acquireNewPage(long required) {
try {
currentPage = allocatePage(required);
} catch (OutOfMemoryError e) {
} catch (SparkOutOfMemoryError e) {
return false;
}
dataPages.add(currentPage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.internal.config.package$;
import org.apache.spark.memory.TestMemoryManager;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.JavaSerializer;
import org.apache.spark.serializer.SerializerInstance;
Expand Down Expand Up @@ -534,10 +535,10 @@ public void testOOMDuringSpill() throws Exception {
insertNumber(sorter, 1024);
fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
}
// we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure)
catch (OutOfMemoryError oom){
// we expect an SparkOutOfMemoryError here, anything else (i.e the original NPE is a failure)
catch (SparkOutOfMemoryError oom){
String oomStackTrace = Utils.exceptionString(oom);
assertThat("expected OutOfMemoryError in " +
assertThat("expected SparkOutOfMemoryError in " +
"org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset",
oomStackTrace,
Matchers.containsString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.memory.TestMemoryConsumer;
import org.apache.spark.memory.TestMemoryManager;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
Expand Down Expand Up @@ -178,8 +179,8 @@ public int compare(
testMemoryManager.markExecutionAsOutOfMemoryOnce();
try {
sorter.reset();
fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
} catch (OutOfMemoryError oom) {
fail("expected SparkOutOfMemoryError but it seems operation surprisingly succeeded");
} catch (SparkOutOfMemoryError oom) {
// as expected
}
// [SPARK-21907] this failed on NPE at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.memory.MemoryBlock;
Expand Down Expand Up @@ -126,7 +127,7 @@ public final void close() {
private boolean acquirePage(long requiredSize) {
try {
page = allocatePage(requiredSize);
} catch (OutOfMemoryError e) {
} catch (SparkOutOfMemoryError e) {
logger.warn("Failed to allocate page ({} bytes).", requiredSize);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.google.common.io.Closeables

import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.io.NioBufferedFileInputStream
import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
import org.apache.spark.memory.{MemoryConsumer, SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.unsafe.Platform
Expand Down Expand Up @@ -226,7 +226,7 @@ private[python] case class HybridRowQueue(
val page = try {
allocatePage(required)
} catch {
case _: OutOfMemoryError =>
case _: SparkOutOfMemoryError =>
null
}
val buffer = if (page != null) {
Expand Down