-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-6752][Streaming] Allow StreamingContext to be recreated from checkpoint and existing SparkContext #5428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #29898 has started for PR 5428 at commit |
|
Test build #29898 has finished for PR 5428 at commit
|
|
Test PASSed. |
|
@jerryshao Mind taking a look at this? Its still WIP as unit tests are commented out. |
|
@tdas , can this RP resolve this issue? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to change to string interpolator style?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think that we should log a warning message in the case where we ignore the error?
|
It looks good to me. Simply curious about the scenarios of this usage, is there any situation where streaming context is failed but spark context is still existed when driver failure is met? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change seems unrelated to this fix. I think we can simply do a null check inside this method and create a FileSystem if needed to avoid unnecessary changes to the calls (all the fs being passed in changing to Some(fs)) -- keeps git history sane.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I added this is so that we should not have to handle nulls. Dealing with nulls is severely frowned upon in Sclaa, and precisely why Option was introduced. There are many places where this has been done, and slowly I was fix those. I think this is a small enough change (doesnt change functionality, or existing code paths) that is okay to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, this file has to change anyways in the attempt to make the semantics of read more clear.
|
@jerryshao , thanks, I will test this case later. |
|
@zzcclp I dont think it will solve this issue directly. But it may allow the SparkContext to be re-initialized properly before the StreamingContext is recreated from checkpoints. |
|
@ALL This is still a WIP. Adding the equivalent Java API requires refactoring the existing |
|
Test build #30389 has started for PR 5428 at commit |
|
@JoshRosen Please take a quick look at the Function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these are unrelated to the PR, but just cleans up the formatting of the JavaAPISuite which is quite badly formatted.
|
Test build #30389 has finished for PR 5428 at commit
|
|
Test FAILed. |
|
Jenkins, test this again. |
|
Jenkins, test this. |
|
Test build #691 has started for PR 5428 at commit |
|
Test build #691 has finished for PR 5428 at commit
|
|
Test build #692 has started for PR 5428 at commit |
|
@tdas , I tested streaming recovering from checkpoint with this PR, it failed if it use accumulators, so this assuredly can't solve issue SPARK-5206 directly. how to solve issue SPARK-5206? |
|
Yes, this is not intended to solve SPARK-5206. |
|
Test build #30743 has started for PR 5428 at commit |
|
Test build #30743 has finished for PR 5428 at commit
|
|
Test FAILed. |
|
Test build #30751 has started for PR 5428 at commit |
|
Test build #30751 has finished for PR 5428 at commit
|
|
Test FAILed. |
|
Test build #693 has started for PR 5428 at commit |
|
Test build #693 has finished for PR 5428 at commit
|
|
Test build #694 has started for PR 5428 at commit |
|
LGTM pending Jenkins. |
|
Test build #695 has started for PR 5428 at commit |
|
Test build #695 has finished for PR 5428 at commit
|
|
Test build #696 has started for PR 5428 at commit |
|
Test build #697 has started for PR 5428 at commit |
|
Test build #697 has finished for PR 5428 at commit
|
|
Merging this. Thanks Josh. |
|
hi, @tdas , why this PR was reverted? |
|
This PR was reverted because I had used MutableBoolean which does not seem to work well with Hadoop 1.0.4. I reopened the PR in #5773. |
…eated from checkpoint and existing SparkContext Original PR #5428 got reverted due to issues between MutableBoolean and Hadoop 1.0.4 (see JIRA). This replaces MutableBoolean with AtomicBoolean. srowen pwendell Author: Tathagata Das <[email protected]> Closes #5773 from tdas/SPARK-6752 and squashes the following commits: a0c0ead [Tathagata Das] Fix for hadoop 1.0.4 70ae85b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752 94db63c [Tathagata Das] Fix long line. 524f519 [Tathagata Das] Many changes based on PR comments. eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate 36a7823 [Tathagata Das] Minor changes. 204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext
|
I have revised the implementation of this PR in a followup PR #6096 |
…heckpoint and existing SparkContext Currently if you want to create a StreamingContext from checkpoint information, the system will create a new SparkContext. This prevent StreamingContext to be recreated from checkpoints in managed environments where SparkContext is precreated. The solution in this PR: Introduce the following methods on StreamingContext 1. `new StreamingContext(checkpointDirectory, sparkContext)` Recreate StreamingContext from checkpoint using the provided SparkContext 2. `StreamingContext.getOrCreate(checkpointDirectory, sparkContext, createFunction: SparkContext => StreamingContext)` If checkpoint file exists, then recreate StreamingContext using the provided SparkContext (that is, call 1.), else create StreamingContext using the provided createFunction TODO: the corresponding Java and Python API has to be added as well. Author: Tathagata Das <[email protected]> Closes apache#5428 from tdas/SPARK-6752 and squashes the following commits: 94db63c [Tathagata Das] Fix long line. 524f519 [Tathagata Das] Many changes based on PR comments. eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate 36a7823 [Tathagata Das] Minor changes. 204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext
…eated from checkpoint and existing SparkContext Original PR apache#5428 got reverted due to issues between MutableBoolean and Hadoop 1.0.4 (see JIRA). This replaces MutableBoolean with AtomicBoolean. srowen pwendell Author: Tathagata Das <[email protected]> Closes apache#5773 from tdas/SPARK-6752 and squashes the following commits: a0c0ead [Tathagata Das] Fix for hadoop 1.0.4 70ae85b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752 94db63c [Tathagata Das] Fix long line. 524f519 [Tathagata Das] Many changes based on PR comments. eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate 36a7823 [Tathagata Das] Minor changes. 204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext
…eated from checkpoint and existing SparkContext Original PR apache#5428 got reverted due to issues between MutableBoolean and Hadoop 1.0.4 (see JIRA). This replaces MutableBoolean with AtomicBoolean. srowen pwendell Author: Tathagata Das <[email protected]> Closes apache#5773 from tdas/SPARK-6752 and squashes the following commits: a0c0ead [Tathagata Das] Fix for hadoop 1.0.4 70ae85b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752 94db63c [Tathagata Das] Fix long line. 524f519 [Tathagata Das] Many changes based on PR comments. eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate 36a7823 [Tathagata Das] Minor changes. 204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext
…heckpoint and existing SparkContext Currently if you want to create a StreamingContext from checkpoint information, the system will create a new SparkContext. This prevent StreamingContext to be recreated from checkpoints in managed environments where SparkContext is precreated. The solution in this PR: Introduce the following methods on StreamingContext 1. `new StreamingContext(checkpointDirectory, sparkContext)` Recreate StreamingContext from checkpoint using the provided SparkContext 2. `StreamingContext.getOrCreate(checkpointDirectory, sparkContext, createFunction: SparkContext => StreamingContext)` If checkpoint file exists, then recreate StreamingContext using the provided SparkContext (that is, call 1.), else create StreamingContext using the provided createFunction TODO: the corresponding Java and Python API has to be added as well. Author: Tathagata Das <[email protected]> Closes apache#5428 from tdas/SPARK-6752 and squashes the following commits: 94db63c [Tathagata Das] Fix long line. 524f519 [Tathagata Das] Many changes based on PR comments. eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate 36a7823 [Tathagata Das] Minor changes. 204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext
…eated from checkpoint and existing SparkContext Original PR apache#5428 got reverted due to issues between MutableBoolean and Hadoop 1.0.4 (see JIRA). This replaces MutableBoolean with AtomicBoolean. srowen pwendell Author: Tathagata Das <[email protected]> Closes apache#5773 from tdas/SPARK-6752 and squashes the following commits: a0c0ead [Tathagata Das] Fix for hadoop 1.0.4 70ae85b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752 94db63c [Tathagata Das] Fix long line. 524f519 [Tathagata Das] Many changes based on PR comments. eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate 36a7823 [Tathagata Das] Minor changes. 204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext
Currently if you want to create a StreamingContext from checkpoint information, the system will create a new SparkContext. This prevent StreamingContext to be recreated from checkpoints in managed environments where SparkContext is precreated.
The solution in this PR: Introduce the following methods on StreamingContext
new StreamingContext(checkpointDirectory, sparkContext)Recreate StreamingContext from checkpoint using the provided SparkContext
StreamingContext.getOrCreate(checkpointDirectory, sparkContext, createFunction: SparkContext => StreamingContext)If checkpoint file exists, then recreate StreamingContext using the provided SparkContext (that is, call 1.), else create StreamingContext using the provided createFunction
TODO: the corresponding Java and Python API has to be added as well.