Skip to content

Commit 66450bb

Browse files
committed
[SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever
## What changes were proposed in this pull request? `ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large but no enough memory is available. However, when this happens, right now BlockTransferService.fetchBlockSync will just hang forever as its `BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`. This PR catches `Throwable` and uses the error to complete `Promise`. ## How was this patch tested? Added a unit test. Since I cannot make `ByteBuffer.allocate` throw `OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` fail. Although the error type is different, it should trigger the same code path. Closes #23590 from zsxwing/SPARK-26665. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 64ce1c9 commit 66450bb

File tree

2 files changed

+112
-4
lines changed

2 files changed

+112
-4
lines changed

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,14 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
107107
case e: EncryptedManagedBuffer =>
108108
result.success(e)
109109
case _ =>
110-
val ret = ByteBuffer.allocate(data.size.toInt)
111-
ret.put(data.nioByteBuffer())
112-
ret.flip()
113-
result.success(new NioManagedBuffer(ret))
110+
try {
111+
val ret = ByteBuffer.allocate(data.size.toInt)
112+
ret.put(data.nioByteBuffer())
113+
ret.flip()
114+
result.success(new NioManagedBuffer(ret))
115+
} catch {
116+
case e: Throwable => result.failure(e)
117+
}
114118
}
115119
}
116120
}, tempFileManager)
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network
19+
20+
import java.io.InputStream
21+
import java.nio.ByteBuffer
22+
23+
import scala.concurrent.Future
24+
import scala.concurrent.duration._
25+
import scala.reflect.ClassTag
26+
27+
import org.scalatest.concurrent._
28+
29+
import org.apache.spark.{SparkException, SparkFunSuite}
30+
import org.apache.spark.network.buffer.ManagedBuffer
31+
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager}
32+
import org.apache.spark.storage.{BlockId, StorageLevel}
33+
34+
class BlockTransferServiceSuite extends SparkFunSuite with TimeLimits {
35+
36+
implicit val defaultSignaler: Signaler = ThreadSignaler
37+
38+
test("fetchBlockSync should not hang when BlockFetchingListener.onBlockFetchSuccess fails") {
39+
// Create a mocked `BlockTransferService` to call `BlockFetchingListener.onBlockFetchSuccess`
40+
// with a bad `ManagedBuffer` which will trigger an exception in `onBlockFetchSuccess`.
41+
val blockTransferService = new BlockTransferService {
42+
override def init(blockDataManager: BlockDataManager): Unit = {}
43+
44+
override def close(): Unit = {}
45+
46+
override def port: Int = 0
47+
48+
override def hostName: String = "localhost-unused"
49+
50+
override def fetchBlocks(
51+
host: String,
52+
port: Int,
53+
execId: String,
54+
blockIds: Array[String],
55+
listener: BlockFetchingListener,
56+
tempFileManager: DownloadFileManager): Unit = {
57+
// Notify BlockFetchingListener with a bad ManagedBuffer asynchronously
58+
new Thread() {
59+
override def run(): Unit = {
60+
// This is a bad buffer to trigger `IllegalArgumentException` in
61+
// `BlockFetchingListener.onBlockFetchSuccess`. The real issue we hit is
62+
// `ByteBuffer.allocate` throws `OutOfMemoryError`, but we cannot make it happen in
63+
// a test. Instead, we use a negative size value to make `ByteBuffer.allocate` fail,
64+
// and this should trigger the same code path as `OutOfMemoryError`.
65+
val badBuffer = new ManagedBuffer {
66+
override def size(): Long = -1
67+
68+
override def nioByteBuffer(): ByteBuffer = null
69+
70+
override def createInputStream(): InputStream = null
71+
72+
override def retain(): ManagedBuffer = this
73+
74+
override def release(): ManagedBuffer = this
75+
76+
override def convertToNetty(): AnyRef = null
77+
}
78+
listener.onBlockFetchSuccess("block-id-unused", badBuffer)
79+
}
80+
}.start()
81+
}
82+
83+
override def uploadBlock(
84+
hostname: String,
85+
port: Int,
86+
execId: String,
87+
blockId: BlockId,
88+
blockData: ManagedBuffer,
89+
level: StorageLevel,
90+
classTag: ClassTag[_]): Future[Unit] = {
91+
// This method is unused in this test
92+
throw new UnsupportedOperationException("uploadBlock")
93+
}
94+
}
95+
96+
val e = intercept[SparkException] {
97+
failAfter(10.seconds) {
98+
blockTransferService.fetchBlockSync(
99+
"localhost-unused", 0, "exec-id-unused", "block-id-unused", null)
100+
}
101+
}
102+
assert(e.getCause.isInstanceOf[IllegalArgumentException])
103+
}
104+
}

0 commit comments

Comments
 (0)