Skip to content

Conversation

@WangGuangxin
Copy link
Contributor

What changes were proposed in this pull request?

When we do shuffle on indeterminate expressions such as rand, and ShuffleFetchFailed happend, we may get incorrent result since it only retries failed map tasks.

To illustrate this, suppose we have a dataset with two columns (range(1, 5) as a, rand() as b), we shuffle by b using two map tasks and two reduce tasks.
image

When there is a fetch failed and we need to rerun map task 2, the generated partitions maybe different compared with last attempt, and finally we get a duplicate record with a = 4
image

This is very similary to the bug in Repartition+Shuffle, which is fixed by #22112.
This PR try to fix this by reuse current machenism.

Why are the changes needed?

Fix data inconsistent issue.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added UT

@WangGuangxin
Copy link
Contributor Author

@cloud-fan @maropu Can you please help review this?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@WangGuangxin
Copy link
Contributor Author

also cc @srowen @viirya @yaooqinn I found there is a similar report before https://issues.apache.org/jira/browse/SPARK-24607

@srowen
Copy link
Member

srowen commented Feb 15, 2022

Isn't this why you shouldn't partition, shuffle, etc on some random value? use a hash?

@WangGuangxin
Copy link
Contributor Author

Isn't this why you shouldn't partition, shuffle, etc on some random value? use a hash?

The data analyst may always have various needs such as distributed by rand() to redistribute data evenly
or select * from (select concat(key1, rand()) as key1 from tbl1) a join (select key2 from tbl2) b on a.key1 = b.key2 to work around skew data, which is a valid SQL in Spark.

Both of these sqls will generate a HashPartitioning with non-deterministic expressions.

If we don't support shuffle by random value, we should disable this.

@srowen
Copy link
Member

srowen commented Feb 15, 2022

Right, shouldn't we reject it? distributing by "hash(ID)" or similar makes more sense, not least of which because it is reproducible and deterministic across runs and environments

@WangGuangxin
Copy link
Contributor Author

Right, shouldn't we reject it? distributing by "hash(ID)" or similar makes more sense, not least of which because it is reproducible and deterministic across runs and environments

Reject it maybe not a good idea.

  1. Both Hive/Presto support patterns like distributed by rand or Join/GroupBy by rand. And seems Spark is intent to support groupby by rand, refer [SPARK-18969][SQL] Support grouping by nondeterministic expressions #16404. And also some udfs or java_methods is marked as indeterminated, if we reject it, it means users cannot join/groupby a column generated by udf/java_methods.
  2. The root cause of data inconsist when shuffle by rand expression is Spark only retry partially map tasks when shuffle fetch failed. If we retry the whole stage, there is no problem. We can utilize current logic in DAGScheduler to achieve this.

@mridulm
Copy link
Contributor

mridulm commented Feb 16, 2022

If there is a fetch failure and the parent stage is INDETERMINATE, both the parent and child stage are recomputed.
Custom RDD's can extend getOutputDeterministicLevel and return the right DeterministicLevel.
See #22112 for more details

@WangGuangxin
Copy link
Contributor Author

If there is a fetch failure and the parent stage is INDETERMINATE, both the parent and child stage are recomputed. Custom RDD's can extend getOutputDeterministicLevel and return the right DeterministicLevel. See #22112 for more details

Thanks for your reference. That's what I do in this MR for SparkSQL.
When we shuffle by rand with sqls like distributed by rand, the RDD's DeterministicLevel generated by SparkSQL is INDETERMINATE after this MR

@mridulm
Copy link
Contributor

mridulm commented Feb 16, 2022

I was not referring to the SQL changes per se @WangGuangxin, will let @srowen or @cloud-fan, etc review that.
Specifically for changes in core, there is already a means to provide deterministic level - we dont need isPartitionKeyIndeterminate, related changes.

/**
* Checks if the shuffle partitioning contains indeterminate expression/reference.
*/
private def isPartitioningIndeterminate(partitioning: Partitioning, plan: SparkPlan): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to build a framework to properly propagate the column-level nondeterministic information. This function looks quite fragile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, Filter(rand_cond, Project(a, b, c, ...)). I think all the columns are nondeterministic after Filter, even though attributes a, b and c are deterministic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the QueryPlan's deterministic? #34470

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's plan level, not column level. We need something more fine-grained

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll first try to add a column level nondeterministic information before this pr

@github-actions
Copy link

github-actions bot commented Jun 5, 2022

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jun 5, 2022
@github-actions github-actions bot closed this Jun 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants