-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-28917][CORE] Synchronize access to RDD mutable state. #25951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
RDD dependencies, partitions, and storageLevel can be simultaneously accessed and mutated by user threads and spark's scheduler threads, so access must be thread-safe. In particular, as partitions and dependencies are lazily-initiliazed, before this change they could get initialized multiple times, which would lead to the scheduler having an inconsistent view of the pendings stages and get stuck.
| def debugSelf(rdd: RDD[_]): Seq[String] = stateLock.synchronized { | ||
| import Utils.bytesToString | ||
|
|
||
| val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @squito . Shall we use getStorageLevel instead of accessing storageLevel here? Then, it seems that we don't need stateLock.synchronized for this debugSelf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, thanks
|
|
||
| /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ | ||
| def getStorageLevel: StorageLevel = storageLevel | ||
| def getStorageLevel: StorageLevel = stateLock.synchronized { storageLevel } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synchronizing calls on simple getters always raises flags for me. Is this really safe?
For example, in one of the places that calls this (getOrCompute): what happens if the storage level changes after this value is returned, but before the call to blockManager.getOrElseUpdate actually does its thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will be honest, I don't really have a good understanding of how the mutability of storageLevel could be an actual problem. I do know of an instance where moving a .cache() before the creation of a Future which submitted a job fixed a hanging job. Unfortunately I have very little info about what else was going on in that job.
I have audited the uses of storageLevel, likegetOrCompute, and I think its OK -- there its passing in a local copy, and anyway that whole function is running on the executor where the storageLevel is immutable.
another tricky one is DAGScheduler.getCacheLocs, which reads both storage level and rdd.partitions. that gets exposed via SparkContext.getPreferredLocs, and used in CoalescedRDD and PartitionAwareUnionRDD. but I still can't come up with an explanation for the behavior we see. I will try to poke some more.
I could also submit a change which just covers partitions and dependencies for now, since that is clear, though I'd like to understand the problem w/ storageLevel as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, what is this racing with? by itself, I'm not sure this forces something to set this before something else needs to read it.
|
Test build #111485 has finished for PR 25951 at commit
|
| // Kind of ugly: need to register RDDs with the cache and map output tracker here | ||
| // since we can't do it in the RDD constructor because # of partitions is unknown | ||
| logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") | ||
| logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + s") as input to " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use interpolation for consistency
|
|
||
| /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ | ||
| def getStorageLevel: StorageLevel = storageLevel | ||
| def getStorageLevel: StorageLevel = stateLock.synchronized { storageLevel } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, what is this racing with? by itself, I'm not sure this forces something to set this before something else needs to read it.
|
Retest this please. |
|
Test build #111633 has finished for PR 25951 at commit
|
|
I spent a while trying to figure out where a race on |
|
Test build #111698 has finished for PR 25951 at commit
|
|
Test build #111713 has finished for PR 25951 at commit
|
|
Test build #111751 has finished for PR 25951 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
| test("reference partitions inside a task") { | ||
| // Run a simple job which just makes sure there is no failure if we touch rdd.partitions | ||
| // inside a task. This requires the stateLock to be serializable. This is very convoluted | ||
| // use case, its just a check for backwards-compatibility after the fix for SPARK-28917. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's
|
Test build #111788 has finished for PR 25951 at commit
|
|
Merging to master / 2.4. |
RDD dependencies and partitions can be simultaneously accessed and mutated by user threads and spark's scheduler threads, so access must be thread-safe. In particular, as partitions and dependencies are lazily-initialized, before this change they could get initialized multiple times, which would lead to the scheduler having an inconsistent view of the pendings stages and get stuck. Tested with existing unit tests. Closes #25951 from squito/SPARK-28917. Authored-by: Imran Rashid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> (cherry picked from commit 0da667d) Signed-off-by: Marcelo Vanzin <[email protected]>
RDD dependencies and partitions can be simultaneously accessed and mutated by user threads and spark's scheduler threads, so access must be thread-safe. In particular, as partitions and dependencies are lazily-initialized, before this change they could get initialized multiple times, which would lead to the scheduler having an inconsistent view of the pendings stages and get stuck. Tested with existing unit tests. Closes apache#25951 from squito/SPARK-28917. Authored-by: Imran Rashid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
| * The use of Integer is simply so this is serializable -- executors may reference the shared | ||
| * fields (though they should never mutate them, that only happens on the driver). | ||
| */ | ||
| private val stateLock = new Integer(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Integer constructor has been deprecated already, see https://docs.oracle.com/javase/9/docs/api/java/lang/Integer.html . This yields the warning:
RDD.scala:240: constructor Integer in class Integer is deprecated: see corresponding Javadoc for more information.
Is it possible to replace it by something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to eliminate the warning in #27399
RDD dependencies and partitions can be simultaneously
accessed and mutated by user threads and spark's scheduler threads, so
access must be thread-safe. In particular, as partitions and
dependencies are lazily-initialized, before this change they could get
initialized multiple times, which would lead to the scheduler having an
inconsistent view of the pendings stages and get stuck.
Tested with existing unit tests.