-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-21977] SinglePartition optimizations break certain Streaming Stateful Aggregation requirements #19196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #81652 has finished for PR 19196 at commit
|
|
Test build #81656 has finished for PR 19196 at commit
|
|
Test build #81657 has finished for PR 19196 at commit
|
| override def apply(plan: SparkPlan): SparkPlan = plan transformUp { | ||
| case ss: StatefulOperator => | ||
| val numPartitions = plan.sqlContext.sessionState.conf.numShufflePartitions | ||
| val keys = ss.keyExpressions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is to not expose keyExpressions in StatefulOperator but use the requiredChildDistribution field to get the required key expression and partitioning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is a better idea.
| trait StatefulOperator extends SparkPlan { | ||
| def stateInfo: Option[StatefulOperatorStateInfo] | ||
|
|
||
| def keyExpressions: Seq[Attribute] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to expose this if we don't want to
|
Test build #81690 has finished for PR 19196 at commit
|
| } | ||
|
|
||
| override def preparations: Seq[Rule[SparkPlan]] = state +: super.preparations | ||
| override def preparations: Seq[Rule[SparkPlan]] = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a odd break up of the line. how about
override def preparations: Seq[Rule[SparkPlan]] =
Seq(state, EnsureStatefulOpPartitioning) ++ super.preparations
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some clean up and code deduping required but overall its in the right direction.
| object EnsureStatefulOpPartitioning extends Rule[SparkPlan] { | ||
| // Needs to be transformUp to avoid extra shuffles | ||
| override def apply(plan: SparkPlan): SparkPlan = plan transformUp { | ||
| case ss: StatefulOperator => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why ss? how about so or op or stateOp
| if (streamDeathCause != null) { | ||
| throw streamDeathCause | ||
| } | ||
| if (!isActive) return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 good catch
| numOutputRows += 1 | ||
| row +: Option(savedState).toSeq | ||
| val hasInput = iter.hasNext | ||
| if (!hasInput && keyExpressions.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add docs on why we are doing this. similar to the docs in other places related to batch aggregation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there wasn't any docs in batch :)
| * `coalesce(1)`, which has several optimizations regarding [[SinglePartition]], and a 0 partition | ||
| * parentRDD. | ||
| */ | ||
| class NonLocalRelationSource(spark: SparkSession) extends Source { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of this source is to basically create empty batches, local/non-local are just internal details. So it should be named accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs should explain accordingly, what it does, not why it does it the way it is. It really does not matter that local relation is not the right thing to use.
| CheckLastBatch((0, 0, 2), (1, 1, 3))) | ||
| } | ||
|
|
||
| private def checkAggregationChain( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it check about the aggregation chain? add docs for any such complex functions
| spark.table("agg_test").as[Long], | ||
| 1L) | ||
|
|
||
| inputSource.addData(2, 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lot of duplicate code. I am sure you can create shortcuts like AddData, and AddFileData for this source, and then you can use testStream(). All the checkAggregation can be put inside an AssertQuery.
| override def apply(plan: SparkPlan): SparkPlan = plan transformUp { | ||
| case ss: StatefulOperator => | ||
| val numPartitions = plan.sqlContext.sessionState.conf.numShufflePartitions | ||
| val keys = ss.keyExpressions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is a better idea.
| } | ||
| } | ||
|
|
||
| test("SPARK-21977: coalesce(1) with 0 partition RDD should be repartitioned accordingly") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does "accordingly" mean? this test name can be improved.
| .outputMode("complete") | ||
| .queryName("agg_test") | ||
| .option("checkpointLocation", tempDir.getAbsolutePath) | ||
| .start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this query code can be deduped into a function
| assert(e.getMessage === "The output mode of function should be append or update") | ||
| } | ||
|
|
||
| test("SPARK-21977: coalesce(1) should still be repartitioned when it has keyExpressions") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what changed code paths this test cover that is not already covered by the other tests you added.
|
Test build #81812 has finished for PR 19196 at commit
|
|
|
||
| def addData(data: Int*): Unit = { | ||
| if (streamLock.getCount == 0) { | ||
| streamLock = new CountDownLatch(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is complicated. See how AddFileData is implemented. It's much simpler.
| .coalesce(1) | ||
| .groupBy() | ||
| .count() | ||
| .as[Long] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: collapse this query.. and at other places.... making the tests look unnecessarily long.
| } | ||
| } | ||
|
|
||
| test("SPARK-21977: coalesce(1) should still be repartitioned when it has keyExpressions") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is keyExpressions? how about non-empty grouping keys.
| } | ||
| } | ||
|
|
||
| test("SPARK-21977: coalesce(1) with 0 partition RDD should be repartitioned to 1") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add more docs to explain this test. this is testing and complicated edge case, so more docs is necessary.
|
|
||
| override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { | ||
| val rdd = new BlockRDD[Int](spark.sparkContext, blocks.toArray) | ||
| .map(i => InternalRow(i)) // we don't really care about the values in this test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do care about the values in the test that has .groupBy('a % 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really since the grouping key is consistently 1
|
Test build #81860 has finished for PR 19196 at commit
|
|
retest this please |
|
Test build #81887 has finished for PR 19196 at commit
|
| import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo} | ||
| import org.apache.spark.sql.test.SharedSQLContext | ||
|
|
||
| class IncrementalExecutionRulesSuite extends SparkPlanTest with SharedSQLContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making this the EnsureStatefulOpPartitioningSuite?
This pattern is followed by many other optimization rules (PropagateEmptyRelationSuite, CollapseProjectSuite...)
| } | ||
| } | ||
|
|
||
| case class TestOperator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be within the above class. Then it wont pollute the general namespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name it as TestStatefulOperator to make it more specific than just "test operator". And add docs saying what is it used for.
| keys => SinglePartition, | ||
| expectShuffle = false) | ||
|
|
||
| private def testEnsureStatefulOpPartitioning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add some docs specifying what does it test.
| testEnsureStatefulOpPartitioning( | ||
| "AllTuples with coalesce(1) doesn't need Exchange", | ||
| baseDf.coalesce(1).queryExecution.sparkPlan, | ||
| keys => AllTuples, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add requiredDistribution = and expectedPartitioning = for greater readability
|
@tdas Addressed |
|
Test build #81938 has finished for PR 19196 at commit
|
|
retest this please |
|
Test build #81942 has finished for PR 19196 at commit
|
|
retest this please |
|
Test build #81946 has finished for PR 19196 at commit
|
|
Test build #81947 has finished for PR 19196 at commit
|
|
Test build #81950 has finished for PR 19196 at commit
|
| } | ||
|
|
||
| /** Used to emulate a `StatefulOperator` with the given requiredDistribution. */ | ||
| case class TestStatefulOperator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has to live outside the test suite, otherwise we get weird failed to makeCopy failures as seen here:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81950/testReport/org.apache.spark.sql.streaming/EnsureStatefulOpPartitioningSuite/ClusteredDistribution_generates_Exchange_with_HashPartitioning/
|
LGTM. |
|
Test build #81954 has finished for PR 19196 at commit
|
|
Thanks! Merging to master |
What changes were proposed in this pull request?
This is a bit hard to explain as there are several issues here, I'll try my best. Here are the requirements:
(mapGroupsWithState, groupBy.count, ...), and coalesce's by 1
The crux of the problem is that when a dataset has a
coalesce(1)call, it receives aSinglePartitionpartitioning scheme. This scheme satisfies most required distributions used for aggregations such as HashAggregateExec. This causes a world of problems:Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 0 partitions, nothing will be executed, the state store will not create any delta files. When this happens, the next trigger fails, because the StateStore fails to load the delta file for the previous trigger
Symptom 2. Let's say that there was data. Then in this case, if you stop your stream, and change
coalesce(1)withcoalesce(2), then restart your stream, your stream will fail, becausespark.sql.shuffle.partitions - 1number of StateStores will fail to find its delta files.To fix the issues above, we must check that the partitioning of the child of a
StatefulOperatorsatisfies:If the grouping expressions are empty:
a) AllTuple distribution
b) Single physical partition
If the grouping expressions are non empty:
a) Clustered distribution
b) spark.sql.shuffle.partition # of partitions
whether or not
coalesce(1)exists in the plan, and whether or not the input RDD for the trigger has any data.Once you fix the above problem by adding an Exchange to the plan, you come across the following bug:
If you call
coalesce(1).groupBy().count()on a Streaming DataFrame, and if you have a trigger with no data,StateStoreRestoreExecdoesn't return the prior state. However, for this specific aggregation,HashAggregateExecafter the restore returns a (0, 0) row, since we're performing a count, and there is no data. Then this data gets stored inStateStoreSaveExeccausing the previous counts to be overwritten and lost.How was this patch tested?
Regression tests