Skip to content

Commit 1e73ee2

Browse files
zsxwinggatorsmile
authored andcommitted
[SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page (branch-2.2)
## What changes were proposed in this pull request? Backport #22062 to branch-2.2. Just two minor differences in the test: - branch-2.2 doesn't have `SparkOutOfMemoryError`. It's using `OutOfMemoryError` directly. - MockitoSugar is in a different package in old scalatest. ## How was this patch tested? Jenkins Closes #22072 from zsxwing/SPARK-25081-2.2. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Xiao Li <[email protected]>
1 parent 051ea3a commit 1e73ee2

File tree

2 files changed

+121
-2
lines changed

2 files changed

+121
-2
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
6666
*/
6767
private int usableCapacity = 0;
6868

69-
private int initialSize;
69+
private final int initialSize;
7070

7171
ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {
7272
this.consumer = consumer;
@@ -95,12 +95,20 @@ public int numRecords() {
9595
}
9696

9797
public void reset() {
98+
// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
99+
pos = 0;
98100
if (consumer != null) {
99101
consumer.freeArray(array);
102+
// As `array` has been released, we should set it to `null` to avoid accessing it before
103+
// `allocateArray` returns. `usableCapacity` is also set to `0` to avoid any codes writing
104+
// data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
105+
// ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to access
106+
// `ShuffleInMemorySorter` when `allocateArray` throws OutOfMemoryError).
107+
array = null;
108+
usableCapacity = 0;
100109
array = consumer.allocateArray(initialSize);
101110
usableCapacity = getUsableCapacity();
102111
}
103-
pos = 0;
104112
}
105113

106114
public void expandPointerArray(LongArray newArray) {
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.sort
19+
20+
import java.lang.{Long => JLong}
21+
22+
import org.mockito.Mockito.when
23+
import org.scalatest.mock.MockitoSugar
24+
25+
import org.apache.spark._
26+
import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
27+
import org.apache.spark.memory._
28+
import org.apache.spark.unsafe.Platform
29+
30+
class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
31+
32+
test("nested spill should be no-op") {
33+
val conf = new SparkConf()
34+
.setMaster("local[1]")
35+
.setAppName("ShuffleExternalSorterSuite")
36+
.set("spark.testing", "true")
37+
.set("spark.testing.memory", "1600")
38+
.set("spark.memory.fraction", "1")
39+
sc = new SparkContext(conf)
40+
41+
val memoryManager = UnifiedMemoryManager(conf, 1)
42+
43+
var shouldAllocate = false
44+
45+
// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true.
46+
// This will trigger a nested spill and expose issues if we don't handle this case properly.
47+
val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
48+
override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
49+
// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use.
50+
// So we leave 400 bytes for the task.
51+
if (shouldAllocate &&
52+
memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
53+
val acquireExecutionMemoryMethod =
54+
memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
55+
acquireExecutionMemoryMethod.invoke(
56+
memoryManager,
57+
JLong.valueOf(
58+
memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
59+
JLong.valueOf(1L), // taskAttemptId
60+
MemoryMode.ON_HEAP
61+
).asInstanceOf[java.lang.Long]
62+
}
63+
super.acquireExecutionMemory(required, consumer)
64+
}
65+
}
66+
val taskContext = mock[TaskContext]
67+
val taskMetrics = new TaskMetrics
68+
when(taskContext.taskMetrics()).thenReturn(taskMetrics)
69+
val sorter = new ShuffleExternalSorter(
70+
taskMemoryManager,
71+
sc.env.blockManager,
72+
taskContext,
73+
100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes
74+
1, // numPartitions
75+
conf,
76+
new ShuffleWriteMetrics)
77+
val inMemSorter = {
78+
val field = sorter.getClass.getDeclaredField("inMemSorter")
79+
field.setAccessible(true)
80+
field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
81+
}
82+
// Allocate memory to make the next "insertRecord" call triggers a spill.
83+
val bytes = new Array[Byte](1)
84+
while (inMemSorter.hasSpaceForAnotherRecord) {
85+
sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
86+
}
87+
88+
// This flag will make the mocked TaskMemoryManager acquire free memory released by spill to
89+
// trigger a nested spill.
90+
shouldAllocate = true
91+
92+
// Should throw `OutOfMemoryError` as there is no enough memory: `ShuffleInMemorySorter`
93+
// will try to acquire 800 bytes but there are only 400 bytes available.
94+
//
95+
// Before the fix, a nested spill may use a released page and this causes two tasks access the
96+
// same memory page. When a task reads memory written by another task, many types of failures
97+
// may happen. Here are some examples we have seen:
98+
//
99+
// - JVM crash. (This is easy to reproduce in the unit test as we fill newly allocated and
100+
// deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory
101+
// address)
102+
// - java.lang.IllegalArgumentException: Comparison method violates its general contract!
103+
// - java.lang.NullPointerException
104+
// at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
105+
// - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912
106+
// because the size after growing exceeds size limitation 2147483632
107+
intercept[OutOfMemoryError] {
108+
sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)