Skip to content

Commit 3a4ea84

Browse files
cxzl25Mridul Muralidharan
authored andcommitted
[SPARK-49502][CORE] Avoid NPE in SparkEnv.get.shuffleManager.unregisterShuffle
### What changes were proposed in this pull request? This PR aims to avoid NPE in `SparkEnv.get.shuffleManager.unregisterShuffle`. ### Why are the changes needed? After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE. ``` 24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized BlockManager: BlockManagerId(168, x, 25467, None) 24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR BlockManagerStorageEndpoint: Error in removing shuffle 29 java.lang.NullPointerException at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No Closes #47977 from cxzl25/SPARK-49502. Authored-by: sychen <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent d5b79c0 commit 3a4ea84

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class SparkEnv (
7373

7474
// We initialize the ShuffleManager later in SparkContext and Executor to allow
7575
// user jars to define custom ShuffleManagers.
76-
private var _shuffleManager: ShuffleManager = _
76+
@volatile private var _shuffleManager: ShuffleManager = _
7777

7878
def shuffleManager: ShuffleManager = _shuffleManager
7979

core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,13 @@ class BlockManagerStorageEndpoint(
6060
if (mapOutputTracker != null) {
6161
mapOutputTracker.unregisterShuffle(shuffleId)
6262
}
63-
SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
63+
val shuffleManager = SparkEnv.get.shuffleManager
64+
if (shuffleManager != null) {
65+
shuffleManager.unregisterShuffle(shuffleId)
66+
} else {
67+
logDebug(log"Ignore remove shuffle ${MDC(SHUFFLE_ID, shuffleId)}")
68+
true
69+
}
6470
}
6571

6672
case DecommissionBlockManager =>

0 commit comments

Comments
 (0)