-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-4229] Create hadoop configuration in a consistent way #3543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… copied from spark config Resolved conflicts in favor of master. Conflicts: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
…ing match that of runtime env
|
ok to test |
|
Test build #24053 has finished for PR 3543 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure that's just diff getting confused based on where the hadoop doc changes were inserted, same lines are marked as removed lower in the diff
|
LGTM. Could you rephrase the title to just say "[SPARK-4229]" instead of "Closes blah", to follow the convention used for PR titles? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be using the "hadoopConfiguration" object in the SparkContext. That has all the hadoop related configuration already setup and should be what is automatically used. @marmbrus should have a better idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I seem to recall there being potential thread safety issues related to
hadoop configuration objects, resulting in the need to create / clone them.
Quick search turned up e.g.
https://issues.apache.org/jira/browse/SPARK-2546
I'm not sure how relevant that is to all of these existing situations where
new Configuration() is being called.
On Tue, Dec 9, 2014 at 5:07 PM, Tathagata Das [email protected]
wrote:
In sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
#3543 (diff):@@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def createParquetFile[A <: Product : TypeTag](
path: String,
allowExisting: Boolean = true,
conf: Configuration = new Configuration()): SchemaRDD = {I think this should be using the "hadoopConfiguration" object in the
SparkContext. That has all the hadoop related configuration already setup
and should be what is automatically used. @marmbrus
https://github.com/marmbrus should have a better idea.—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3543/files#r21571141.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@koeninger The issue that you linked is concerned with thread-safety issues when multiple threads concurrently modify the same Configuration instance.
It turns out that there's another, older thread-safety issue related to Configuration's constructor not being thread-safe due to non-thread-safe static state: https://issues.apache.org/jira/browse/HADOOP-10456. This has been fixed in some newer Hadoop releases, but since it was only reported in April I don't think we can ignore it. As a result, https://issues.apache.org/jira/browse/SPARK-1097 implements a workaround which synchronizes on an object before calling new Configuration. Currently, I think the extra synchronization logic is only implemented in HadoopRDD, but it should probably be used everywhere just to be safe. I think that HadoopRDD was the "highest-risk" place where we might have many threads creating Configurations at the same time, which is probably why that patch's author didn't add the synchronization everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So let me see if I have things straight
- Currently, the code is using new Configuration() as a default, which may have some thread safety issues due to the constructor
- my original patch uses SparkHadoopUtil.get.conf, which is a singleton, so should decrease the constructor thread safety problem, but increase the problems if the hadoop configuration is modified. It also won't do the right thing for people who have altered the sparkConf, which makes it no good (I haven't run into this in personal usage of the patched version, because I always pass in a complete sparkConf via properties rather than setting it in code)
- @tdas suggested to use this.sparkContext.hadoopConfiguration. This will use the "right" spark config, but may have thread safety issues both at construction the time the spark context is created, and if the configuration is modified.
So....
Use tdas' suggestion, add a HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized block to SparkHadoopUtil.newConfiguration? And people are out of luck if they have code that used to work because they were modifying new blank instances of Configuration, rather than the now-shared one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to use CONFIGURATION_INSTANTIATION_LOCK in multiple places, then I think it makes sense to move CONFIGURATION_INSTANTIATION_LOCK into SparkHadoopUtil, since that seems like a more logical place for it to live than HadoopRDD. I like the idea of hiding the synchronization logic behind a method like SparkHadoopUtil.newConfiguration.
Regarding whether SparkContext.hadoopConfiguration will lead to thread-safety issues: I did a bit of research on this while developing a workaround for the other configuration thread-safety issues and wrote a series of comments citing cases of code "in the wild" that depend on mutating SparkContext.hadoopConfiguration. For example, there are a lot of snippets of code that look like this:
sc.hadoopConfiguration.set("es.resource", "syslog/entry")
output.saveAsHadoopFile[ESOutputFormat]("-")In Spark 1.x, I don't think we'll be able to safely transition away from using the shared SparkContext.hadoopConfiguration instance since there's so much existing code that relies on the current behavior.
However, I think that there's much less risk of running into thread-safety issues as a result of this. It seems fairly unlikely that you'll have multiple threads mutating the shared configuration in the driver JVM. In executor JVMs, most Hadoop InputFormats (and other classes) don't mutate configurations, so we shouldn't run into issues; for those that do mutate, users can always enable the cloneConf setting.
In a nutshell, I don't think that the shared sc.hadoopConfiguration is a good design that we would choose if we were redesigning it, but using it here seems consistent with the behavior that we have elsewhere in Spark as long as we're stuck with this for 1.x.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And people are out of luck if they have code that used to work because they were modifying new blank instances of Configuration, rather than the now-shared one?
I don't think that users were able to access the old new Configuration() instance; I think that the only code that could have modified this would be the Parquet code.
…tion where possible, synchronize new Configuration
|
Test build #24512 has finished for PR 3543 at commit
|
|
Jenkins is failing org.apache.spark.scheduler.SparkListenerSuite.local metrics I can't reproduce those test failures locally. |
|
I'll look at those tests' code in a little bit to see if I can figure out whether they're prone to random flakiness. I don't recall seeing flakiness from these tests before, so this seems like it's worth investigating. FYI, I have an open PR that tries to address some of the causes of streaming test flakiness: #3687 |
|
Jenkins, retest this please. |
|
(Might as well have Jenkins run this again just to see whether the failure is nondeterministic) |
|
Test build #24551 has finished for PR 3543 at commit
|
|
Jenkins, retest this please. |
|
@JoshRosen I leave it to you to figure out changes related to the |
|
Test build #24789 has finished for PR 3543 at commit
|
|
Just for posterity: I think a couple of changes sneaked in between my original change being sent as a PR and it being commited, making my code miss some At that time, I explicitly avoided changing default arguments to a few methods (since my thinking was that since it's an argument, the user should know what he's doing). But I don't really have an opinion about what's the right approach there, and changing it is fine with me. I also don't have enough background to comment on the thread-safety issues (others have looked at it in much more depth than I have)... |
|
ping. Whats the status here? |
|
I think its mostly a question of whether committers are comfortable with a At this point it'd probably need another audit of the code to see if there On Tue, Mar 17, 2015 at 10:41 PM, Michael Armbrust <[email protected]
|
|
Another ping. @koeninger @tdas @JoshRosen should we move forward with this patch, or close it since it's mostly gone stale at this point? |
|
As far as I know, its still an issue - by default, any checkpoint that
|
|
Hey @koeninger, looking at this patch again, I would like to absorb the streaming changes at the very least. Those issues still exist in streaming, and would be a good fix to have. So mind closing this PR and issuing a new PR with only the fixes to the streaming API? |
|
@koeninger Ping. |
|
Just to be clear, are we talking about removing just the one-line changes to SQLContext and JavaSQLContext? Everything else in the PR I think is necessary in order to make the changes in streaming. |
|
Otherway round. Just keep the changes in StreamingContext, DStream, and PairDStreamFunctions. |
|
Except that those streaming changes call into SparkHadoopUtil, which was changed in that PR for thread safety reasons. HadoopRDD was changed so there was only 1 lock being used. At that point the only thing left is doc changes and the sql changes. |
|
Aah right, makes sense. That definitely complicates things, because that is the hard questions, whether to put that lock or not. @JoshRosen is the best person to answer that. Unfortunately he is swamped :( |
|
Haven't dug in to this in detail yet, but it's possible that the bug that motivated the |
|
Test build #38584 has finished for PR 3543 at commit
|
|
If we're talking about this issue https://issues.apache.org/jira/browse/HADOOP-11209 unless there's something arcane about hadoop's jira, it looks like that was only resolved in April for 2.7 @tdas if you think we're better off / not worse off with at least having the streaming-only changes in for spark 1.5, I can put in a narrower PR for that and we can punt on the thread safety issues for now |
|
Yeah, may be that is a good idea for now. In fact, SparkHadoopUtil.get.conf calls newConfiguration only once, and then the configuration is used everywhere. So the newConfiguration() will be called only every once in the lifetime of the application, and the likelihood of the race condition causing a problem here is really small. So I think its fine for now to just address this. The way I would do this is to make the JIRA specific to streaming only (set component and title accordingly). And file a separate JIRA (if not already present) for a possible problem in newConfiguration() linking it to the Hadoop JIRA. Does that make sense? @JoshRosen Any thoughts? |
|
Changing this jira to be streaming only and making another for thread safety issues still leaves all the inconsistent calls to new Configuration in SQL, and probably other places (at a quick grep, external/flume, external/twitter, and maybe core). Ill get a PR with changes only to streaming/, let me know what you guys want to do as far as jiras |
|
streaming only pr is at #7772 |
|
Fair point. How about make subtasks of the JIRA for different components, and then use those JIRA ids? |
|
Added subtasks, changed the title of #7772 to refer to the streaming subtask jira ID. Let me know if you see anything on that that needs tweaking before the 1.5 freeze date |
|
Okay #7772 has been merged. Mind removing the streaming changes from this PR to make this cleaner? |
|
@koeninger would you mind updating this patch per @tdas' suggestion? |
|
@andrewor14 master has diverged sufficiently from this PR that I don't think it's useful to keep it merge-able. If we think someone's willing to accept the changes to core and sql those subtasks should be revisited with this general approach as a basis. |
|
OK, makes sense. Can you close this PR for now then? If there's interest we can always reopen it against the latest master. |
No description provided.