-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-10649][STREAMING] Prevent inheriting job group and irrelevant job description in streaming jobs #8781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
df6f029
5b6ab28
0553664
1b347d7
c4534fd
cc90b2f
2afc50e
d8600cf
7550490
f84f479
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._ | |
| import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} | ||
| import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} | ||
| import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} | ||
| import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils} | ||
| import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils} | ||
|
|
||
| /** | ||
| * Main entry point for Spark Streaming functionality. It provides methods used to create | ||
|
|
@@ -588,12 +588,20 @@ class StreamingContext private[streaming] ( | |
| state match { | ||
| case INITIALIZED => | ||
| startSite.set(DStream.getCreationSite()) | ||
| sparkContext.setCallSite(startSite.get) | ||
| StreamingContext.ACTIVATION_LOCK.synchronized { | ||
| StreamingContext.assertNoOtherContextIsActive() | ||
| try { | ||
| validate() | ||
| scheduler.start() | ||
|
|
||
| // Start the streaming scheduler in a new thread, so that thread local properties | ||
| // like call sites and job groups can be reset without affecting those of the | ||
| // current thread. | ||
| ThreadUtils.runInNewThread("streaming-start") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this block definitely requires a paragraph comment. Someone new to the code will have no idea why we need to do these things in a new thread. |
||
| sparkContext.setCallSite(startSite.get) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason why we need to move this in here? It's an atomic reference so it doesn't really matter which thread reads it right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because this sets a thread local variable.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. an inheritable thread local variable, so it still doesn't matter (anyway we can just keep this change, not a big deal, mainly just wondering) |
||
| sparkContext.clearJobGroup() | ||
| sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can use
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to set
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically I dont want to rely on the default value of this parameter, and explicitly specify that do not interrupt. |
||
| scheduler.start() | ||
| } | ||
| state = StreamingContextState.ACTIVE | ||
| } catch { | ||
| case NonFatal(e) => | ||
|
|
@@ -618,6 +626,7 @@ class StreamingContext private[streaming] ( | |
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Wait for the execution to stop. Any exceptions that occurs during the execution | ||
| * will be thrown in this thread. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -180,6 +180,38 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo | |
| assert(ssc.scheduler.isStarted === false) | ||
| } | ||
|
|
||
| test("start should set job group and description of streaming jobs correctly") { | ||
| ssc = new StreamingContext(conf, batchDuration) | ||
| ssc.sc.setJobGroup("non-streaming", "non-streaming", true) | ||
| val sc = ssc.sc | ||
|
|
||
| @volatile var jobGroupFound: String = "" | ||
| @volatile var jobDescFound: String = "" | ||
| @volatile var jobInterruptFound: String = "" | ||
| @volatile var allFound: Boolean = false | ||
|
|
||
| addInputStream(ssc).foreachRDD { rdd => | ||
| jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) | ||
| jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) | ||
| jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) | ||
| allFound = true | ||
| } | ||
| ssc.start() | ||
|
|
||
| eventually(timeout(10 seconds), interval(10 milliseconds)) { | ||
| assert(allFound === true) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it possible to use semaphores here or some kind of locking to make the test more robust? I'm worried that it could become another flaky test that we have to fix eventually anyway
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How can volatile lead to flakiness?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if for some reason we don't execute
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well you have to set some time limit no matter what method you use to wait, isnt it?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessarily. If it hangs forever Jenkins will time it out anyway. In general I'm not a huge fan of eventually's because these are usually the ones we have to end up fixing later. If you guess a timeout wrong then you'll have to try a slightly higher one after breaking many builds. In this case it's probably OK to keep this as is, but I'm just saying I would have written the test differently.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, in both cases, one has to go and fix the test :) Anyways not going into the argument of eventually vs block permanently. I think its better to use eventually with conservative timeouts, than block completely. I am fairly certain that this test is fine. Other tests in this suite also uses same eventually and they have not been flaky (the few times StreamingContextSuite has been flaky, they were for one test which had a real bug, not for eventually). I can make the timeout even more conservative, no harm in that. |
||
|
|
||
| // Verify streaming jobs have expected thread-local properties | ||
| assert(jobGroupFound === null) | ||
| assert(jobDescFound === null) | ||
| assert(jobInterruptFound === "false") | ||
|
|
||
| // Verify current thread's thread-local properties have not changed | ||
| assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming") | ||
| assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming") | ||
| assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true") | ||
| } | ||
|
|
||
| test("start multiple times") { | ||
| ssc = new StreamingContext(master, appName, batchDuration) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you add a comment about the magic number
1, such asremove "java.lang.Thread.getStackTrace"?