Skip to content

Commit 7286988

Browse files
committed
[SPARK-10649] [STREAMING] Prevent inheriting job group and irrelevant job description in streaming jobs
The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense. 1. Job group: This is mainly used for cancelling a group of jobs together. It does not make sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not a valid usecase any way, to cancel a streaming context, call streamingContext.stop() 2. Job description: This is used to pass on nice text descriptions for jobs to show up in the UI. The job description of the thread that calls streamingContext.start() is not useful for all the streaming jobs, as it does not make sense for all of the streaming jobs to have the same description, and the description may or may not be related to streaming. The solution in this PR is meant for the Spark master branch, where local properties are inherited by cloning the properties. The job group and job description in the thread that starts the streaming scheduler are explicitly removed, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start(). Author: Tathagata Das <[email protected]> Closes #8781 from tdas/SPARK-10649.
1 parent 7c4f852 commit 7286988

File tree

4 files changed

+126
-4
lines changed

4 files changed

+126
-4
lines changed

core/src/main/scala/org/apache/spark/util/ThreadUtils.scala

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.apache.spark.util
2121
import java.util.concurrent._
2222

2323
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
24+
import scala.util.control.NonFatal
2425

2526
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
2627

@@ -86,4 +87,62 @@ private[spark] object ThreadUtils {
8687
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
8788
Executors.newSingleThreadScheduledExecutor(threadFactory)
8889
}
90+
91+
/**
92+
* Run a piece of code in a new thread and return the result. Exception in the new thread is
93+
* thrown in the caller thread with an adjusted stack trace that removes references to this
94+
* method for clarity. The exception stack traces will be like the following
95+
*
96+
* SomeException: exception-message
97+
* at CallerClass.body-method (sourcefile.scala)
98+
* at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
99+
* at CallerClass.caller-method (sourcefile.scala)
100+
* ...
101+
*/
102+
def runInNewThread[T](
103+
threadName: String,
104+
isDaemon: Boolean = true)(body: => T): T = {
105+
@volatile var exception: Option[Throwable] = None
106+
@volatile var result: T = null.asInstanceOf[T]
107+
108+
val thread = new Thread(threadName) {
109+
override def run(): Unit = {
110+
try {
111+
result = body
112+
} catch {
113+
case NonFatal(e) =>
114+
exception = Some(e)
115+
}
116+
}
117+
}
118+
thread.setDaemon(isDaemon)
119+
thread.start()
120+
thread.join()
121+
122+
exception match {
123+
case Some(realException) =>
124+
// Remove the part of the stack that shows method calls into this helper method
125+
// This means drop everything from the top until the stack element
126+
// ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`).
127+
val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
128+
! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
129+
130+
// Remove the part of the new thread stack that shows methods call from this helper method
131+
val extraStackTrace = realException.getStackTrace.takeWhile(
132+
! _.getClassName.contains(this.getClass.getSimpleName))
133+
134+
// Combine the two stack traces, with a place holder just specifying that there
135+
// was a helper method used, without any further details of the helper
136+
val placeHolderStackElem = new StackTraceElement(
137+
s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
138+
" ", "", -1)
139+
val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace
140+
141+
// Update the stack trace and rethrow the exception in the caller thread
142+
realException.setStackTrace(finalStackTrace)
143+
throw realException
144+
case None =>
145+
result
146+
}
147+
}
89148
}

core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ package org.apache.spark.util
2020

2121
import java.util.concurrent.{CountDownLatch, TimeUnit}
2222

23-
import scala.concurrent.{Await, Future}
2423
import scala.concurrent.duration._
24+
import scala.concurrent.{Await, Future}
25+
import scala.util.Random
2526

2627
import org.apache.spark.SparkFunSuite
2728

@@ -66,4 +67,25 @@ class ThreadUtilsSuite extends SparkFunSuite {
6667
val futureThreadName = Await.result(f, 10.seconds)
6768
assert(futureThreadName === callerThreadName)
6869
}
70+
71+
test("runInNewThread") {
72+
import ThreadUtils._
73+
assert(runInNewThread("thread-name") { Thread.currentThread().getName } === "thread-name")
74+
assert(runInNewThread("thread-name") { Thread.currentThread().isDaemon } === true)
75+
assert(
76+
runInNewThread("thread-name", isDaemon = false) { Thread.currentThread().isDaemon } === false
77+
)
78+
val uniqueExceptionMessage = "test" + Random.nextInt()
79+
val exception = intercept[IllegalArgumentException] {
80+
runInNewThread("thread-name") { throw new IllegalArgumentException(uniqueExceptionMessage) }
81+
}
82+
assert(exception.asInstanceOf[IllegalArgumentException].getMessage === uniqueExceptionMessage)
83+
assert(exception.getStackTrace.mkString("\n").contains(
84+
"... run in separate thread using org.apache.spark.util.ThreadUtils ...") === true,
85+
"stack trace does not contain expected place holder"
86+
)
87+
assert(exception.getStackTrace.mkString("\n").contains("ThreadUtils.scala") === false,
88+
"stack trace contains unexpected references to ThreadUtils"
89+
)
90+
}
6991
}

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
4444
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
4545
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
4646
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
47-
import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
47+
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils}
4848

4949
/**
5050
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -588,12 +588,20 @@ class StreamingContext private[streaming] (
588588
state match {
589589
case INITIALIZED =>
590590
startSite.set(DStream.getCreationSite())
591-
sparkContext.setCallSite(startSite.get)
592591
StreamingContext.ACTIVATION_LOCK.synchronized {
593592
StreamingContext.assertNoOtherContextIsActive()
594593
try {
595594
validate()
596-
scheduler.start()
595+
596+
// Start the streaming scheduler in a new thread, so that thread local properties
597+
// like call sites and job groups can be reset without affecting those of the
598+
// current thread.
599+
ThreadUtils.runInNewThread("streaming-start") {
600+
sparkContext.setCallSite(startSite.get)
601+
sparkContext.clearJobGroup()
602+
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
603+
scheduler.start()
604+
}
597605
state = StreamingContextState.ACTIVE
598606
} catch {
599607
case NonFatal(e) =>
@@ -618,6 +626,7 @@ class StreamingContext private[streaming] (
618626
}
619627
}
620628

629+
621630
/**
622631
* Wait for the execution to stop. Any exceptions that occurs during the execution
623632
* will be thrown in this thread.

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,38 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
180180
assert(ssc.scheduler.isStarted === false)
181181
}
182182

183+
test("start should set job group and description of streaming jobs correctly") {
184+
ssc = new StreamingContext(conf, batchDuration)
185+
ssc.sc.setJobGroup("non-streaming", "non-streaming", true)
186+
val sc = ssc.sc
187+
188+
@volatile var jobGroupFound: String = ""
189+
@volatile var jobDescFound: String = ""
190+
@volatile var jobInterruptFound: String = ""
191+
@volatile var allFound: Boolean = false
192+
193+
addInputStream(ssc).foreachRDD { rdd =>
194+
jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
195+
jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
196+
jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
197+
allFound = true
198+
}
199+
ssc.start()
200+
201+
eventually(timeout(10 seconds), interval(10 milliseconds)) {
202+
assert(allFound === true)
203+
}
204+
205+
// Verify streaming jobs have expected thread-local properties
206+
assert(jobGroupFound === null)
207+
assert(jobDescFound === null)
208+
assert(jobInterruptFound === "false")
209+
210+
// Verify current thread's thread-local properties have not changed
211+
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming")
212+
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming")
213+
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true")
214+
}
183215

184216
test("start multiple times") {
185217
ssc = new StreamingContext(master, appName, batchDuration)

0 commit comments

Comments
 (0)