Skip to content

Commit c561e6c

Browse files
committed
[SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in Signaling.cancelOnInterrupt
## What changes were proposed in this pull request? `Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes ReplSuite unstable. This PR adds `SparkContext.getActive` to allow `Signaling.cancelOnInterrupt` to get the active `SparkContext` to avoid the leak. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16825 from zsxwing/SPARK-19481.
1 parent da3dfaf commit c561e6c

File tree

5 files changed

+20
-11
lines changed

5 files changed

+20
-11
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2295,6 +2295,13 @@ object SparkContext extends Logging {
22952295
getOrCreate(new SparkConf())
22962296
}
22972297

2298+
/** Return the current active [[SparkContext]] if any. */
2299+
private[spark] def getActive: Option[SparkContext] = {
2300+
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
2301+
Option(activeContext.get())
2302+
}
2303+
}
2304+
22982305
/**
22992306
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
23002307
* running. Throws an exception if a running context is detected and logs a warning if another

repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging
2222
object Main extends Logging {
2323

2424
initializeLogIfNecessary(true)
25+
Signaling.cancelOnInterrupt()
2526

2627
private var _interp: SparkILoop = _
2728

repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1027,7 +1027,6 @@ class SparkILoop(
10271027
builder.getOrCreate()
10281028
}
10291029
sparkContext = sparkSession.sparkContext
1030-
Signaling.cancelOnInterrupt(sparkContext)
10311030
sparkSession
10321031
}
10331032

repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
3030
object Main extends Logging {
3131

3232
initializeLogIfNecessary(true)
33+
Signaling.cancelOnInterrupt()
3334

3435
val conf = new SparkConf()
3536
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
@@ -108,7 +109,6 @@ object Main extends Logging {
108109
logInfo("Created Spark session")
109110
}
110111
sparkContext = sparkSession.sparkContext
111-
Signaling.cancelOnInterrupt(sparkContext)
112112
sparkSession
113113
}
114114

repl/src/main/scala/org/apache/spark/repl/Signaling.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,17 @@ private[repl] object Signaling extends Logging {
2828
* when no jobs are currently running.
2929
* This makes it possible to interrupt a running shell job by pressing Ctrl+C.
3030
*/
31-
def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") {
32-
if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
33-
logWarning("Cancelling all active jobs, this can take a while. " +
34-
"Press Ctrl+C again to exit now.")
35-
ctx.cancelAllJobs()
36-
true
37-
} else {
38-
false
39-
}
31+
def cancelOnInterrupt(): Unit = SignalUtils.register("INT") {
32+
SparkContext.getActive.map { ctx =>
33+
if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
34+
logWarning("Cancelling all active jobs, this can take a while. " +
35+
"Press Ctrl+C again to exit now.")
36+
ctx.cancelAllJobs()
37+
true
38+
} else {
39+
false
40+
}
41+
}.getOrElse(false)
4042
}
4143

4244
}

0 commit comments

Comments
 (0)