-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-14393][SQL] values generated by non-deterministic functions shouldn't change after coalesce or union #15567
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #67258 has finished for PR 15567 at commit
|
|
Test build #67257 has finished for PR 15567 at commit
|
|
Test build #67260 has finished for PR 15567 at commit
|
| /** | ||
| * [performance] Spark's internal mapPartitions method that skips closure cleaning. | ||
| */ | ||
| private[spark] def mapPartitionsInternal[U: ClassTag]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get rid of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 20+ probably valid use of mapPartitionsInternal. The main problem is that changing it to mapPartitionsWithIndexInternal doesn't really force people to initialize the partition.
|
|
||
| final def setInitialValues(): Unit = { | ||
| initInternal() | ||
| final def initializeStatesForPartition(partitionIndex: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while you are at it, it'd be great to add some comments documenting the function ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just naming this "initialize"? It is fairly long right now ....
And we just document to say initialize must be called prior to task execution on a partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to overload the name initialize, which is a little vague, how about initStates? Again, the issue is that even with comments we cannot force users to initialize it.
| * An integer `partitionIndex` will be made available within the scope. | ||
| */ | ||
| val partitionInitializationStatements: mutable.ArrayBuffer[String] = mutable.ArrayBuffer.empty | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason you are creating this rather than just using addMutableState?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little worried about introducing more issues by moving initMutableStates out from the constructor. The current implementation at least maintains the existing behavior if we missed initializeStatesForPartition somewhere.
| final override def deterministic: Boolean = false | ||
| final override def foldable: Boolean = false | ||
|
|
||
| private[this] var initialized = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we change this to transient? then it will always get reset to false on a new partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
| */ | ||
| @ExpressionDescription( | ||
| usage = "_FUNC_() - Returns the current partition id of the Spark task", | ||
| usage = "_FUNC_() - Returns the current partition id", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm this is behavior changing, and there is some value to the old partition id ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd consider introducing a new expression for the proper id and leave the old one as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this. But I don't think the current behavior is the expected behavior from users. This is the same issue as in monotonically_increasing_id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea but it is consistent with TaskContext.partitionId (which is also the name of the function)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is SparkPartitionID not TaskContextPartitionID. We should follow the same semantic for non-deterministic expressions.
| * This is used by non-deterministic expressions to set initial states. | ||
| * The default implementation does nothing. | ||
| */ | ||
| def initializeStatesForPartition(partitionIndex: Int): Unit = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to make this safer, i'd create an internal variable "isInitialized" similar to the one in nondeterministic expression, and assert in eval if isInitialized is false
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't test. Would doing that hurt the performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, since it is in the interpreted path which is already very slow. Also in the normal case the condition will always be false, so CPU branch prediction should work its magic.
|
Reviewing this code makes me realize how painful it is when project/filter are just scala functions ... it'd be much easier to review if they have methods defined (e.g. eval, or execute) ... |
|
So my biggest question is whether you've changed all the places to call initialize where projection/predicate are used. |
|
Test build #67270 has finished for PR 15567 at commit
|
|
Test build #67277 has finished for PR 15567 at commit
|
|
@mengxr - I think this PR will also address SPARK-14241. |
|
SPARK-14241 doesn't just occur with union and coalesce, it also occurs with filter and probably other operations. Hopefully this PR will address all of those situations. I strongly agree with the expected semantic in the original PR message by mengxr - this has bitten me on multiple occasions. |
|
@rxin I updated the implementation to force initialization in Projection/Expression. This will fail many tests. I fixed all in
Basically, we assume non-deterministic by default unless marked as deterministic. This will require updating all expressions but make the code less messy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline this is StatefulExpression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, Projection also need the same trait. Shall we call it PartitionedStateful?
|
Test build #67907 has finished for PR 15567 at commit
|
|
I reverted the changes I made to enforce Renamed |
|
Test build #67957 has finished for PR 15567 at commit
|
|
Test build #67959 has finished for PR 15567 at commit
|
|
Merging in master/branch-2.1. Thanks. |
…ouldn't change after coalesce or union ## What changes were proposed in this pull request? When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following: - The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations. However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column. See the unit tests below or JIRA for examples. This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization. ## How was this patch tested? Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...) cc: rxin davies Author: Xiangrui Meng <[email protected]> Closes #15567 from mengxr/SPARK-14393. (cherry picked from commit 02f2031) Signed-off-by: Reynold Xin <[email protected]>
…ouldn't change after coalesce or union ## What changes were proposed in this pull request? When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following: - The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations. However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column. See the unit tests below or JIRA for examples. This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization. ## How was this patch tested? Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...) cc: rxin davies Author: Xiangrui Meng <[email protected]> Closes apache#15567 from mengxr/SPARK-14393.
| child.execute().mapPartitionsInternal { iter => | ||
| child.execute().mapPartitionsWithIndexInternal { (index, iter) => | ||
| val predicate = newPredicate(condition, child.output) | ||
| predicate.initialize(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering why FilterExec is not using index to initialize the conditions?
What changes were proposed in this pull request?
When a user appended a column using a "nondeterministic" function to a DataFrame, e.g.,
rand,randn, andmonotonically_increasing_id, the expected semantic is the following:However, since we use
TaskContext.getPartitionIdto get the partition index from the current thread, the values from nondeterministic columns might change if we callunionorcoalesceafter.TaskContext.getPartitionIdreturns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column.See the unit tests below or JIRA for examples.
This PR uses the partition index from
RDD.mapPartitionWithIndexinstead ofTaskContextand fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback.initializeStatesForPartition(partitionIndex: Int)was added toProjection,Nondeterministic, andPredicate(codegen) and initialized right after object creation inmapPartitionWithIndex.newPredicatenow returns aPredicateinstance rather than a function for proper initialization.How was this patch tested?
Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...)
cc: @rxin @davies