Skip to content

Conversation

@mgaido91
Copy link
Contributor

What changes were proposed in this pull request?

In Dataset.join we have a small hack for resolving ambiguity in the column name for self-joins. The current code supports only EqualTo, but we may have other conditions involving columns on self joins: in general any BinaryComparison can be specified and faces the same issue.

The PR extends the fix to all BinaryComparisons.

How was this patch tested?

added UT

@daniel-shields
Copy link

I'm not sure that this behavior should be applied to all binary comparisons. It could result in unexpected behavior in some rare cases. For example:
df1.join(df2, df2['x'] < df1['x'])
If 'x' is ambiguous, this would result in the conditional being flipped.

@SparkQA
Copy link

SparkQA commented May 29, 2018

Test build #91253 has finished for PR 21449 at commit 92cb513.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

@daniel-shields in that case you have 2 different datasets df1 and df2. So they are 2 distinct attributes and the check a.sameRef(b) would return false. This is applied only in case you have self-joins, ie. you have the same dataset on both sides.

@daniel-shields
Copy link

daniel-shields commented May 29, 2018

This case can also occur when the datasets are different but share a common lineage. Consider the following:

df = spark.range(10)
df1 = df.groupby('id').count()
df2 = df.groupby('id').sum('id')
df1.join(df2, df2['id'].eqNullSafe(df1['id'])).collect()

This currently fails with eqNullSafe, but works with ==.
The case I am worried about is when the last line looks like this instead:
df1.join(df2, df2['id'] < df1['id']).collect()
In Spark 2.3.0, this condition actually resolves to false, so the new behavior is likely preferred anyway. I think the best outcome would be that BinaryComparison other than EqualTo and EqualNullSafe result in analysis errors.

@mgaido91
Copy link
Contributor Author

thanks @daniel-shields , you're right. I am working to check if and how this can be fixed. Thanks for your catch!

@SparkQA
Copy link

SparkQA commented May 30, 2018

Test build #91298 has finished for PR 21449 at commit e8a5fa3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 30, 2018

Test build #91303 has finished for PR 21449 at commit b8d5057.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@daniel-shields
Copy link

daniel-shields commented May 30, 2018

@mgaido91 I looked at the test failures and I think the changes to the Dataset,resolve method are causing havoc. Consider the Dataset.drop method with the following signature:
def drop(col: Column): DataFrame
It contains a statement that may be comparing an AttributeReference with the new metadata to one without it. The == operator on AttributeReference likely includes the metadata in the comparison.

val colsAfterDrop = attrs.filter { attr =>
      attr != expression
    }.map(attr => Column(attr))

This may be resulting in columns not getting dropped. I haven't verified but this is the first thing I would check. The change to resolve may be too drastic. I think this same problem occurs in other Dataset methods as well. It may also affect methods in KeyValueGroupedDataset and RelationalGroupedDataset.

@mgaido91
Copy link
Contributor Author

yes @daniel-shields, you are right with your analysis. The problem was that we were sometimes using ==, sometimes semanticEquals. And equals has the problem you mentioned.

I think this is the only way for addressing the problem described here is to reference which dataset the column is coming from. I think adding a metadata for it is the cleanest way. We may also add a new attribute to the Attribute class instead of using metadata, but honestly this way seemed cleaner to me. What do you think? Do you have other suggestions?

cc @cloud-fan @hvanhovell @gatorsmile

@cloud-fan
Copy link
Contributor

This is a long-standing issue, I've seen many attempts to fix it (including myself) but no one success.

The major problem is, there is no clear definition of the expected behavior, i.e. what's the semantic of Dataset.col?

some examples

df.select(df.col("i")) // valid

val df1 = df.filter(...)
df1.select(df.col("i")) // still valid

df.join(otherDF, df.col("i") === otherDF.col("i")) // valid

df.join(otherDF).select(df.col("i"), otherDF("i"))  // valid

val df2 = df.select(df.col("i") + 1)
df2.select(df.col("i"))   // invalid

Sometime we can use an ancestor's column in a new Dataset but sometimes we can't. We should make the condition clear first.

@SparkQA
Copy link

SparkQA commented May 31, 2018

Test build #91343 has finished for PR 21449 at commit 8e6e5c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Jun 1, 2018

Thanks for your comment @cloud-fan. I understand your point. That is quite a tricky problem, since we should know probably also the "DAG" of the dataframes in order to take the right decision.

But despite this change is related to that problem, I think it is different and with a much smaller scope. Indeed, while we can use the metadata information in many places, actually in this patch is is used only in the self-join case when there is ambiguity in which column to take. The behavior in any other case in unchanged.

So after this patch, the situation in resolving column using col is unchanged. The only places where the dataset of provenance is checked is in self joins. The goal here is only to support cases which were throwing exceptions in resolving the right column.

@cloud-fan
Copy link
Contributor

My point is that, we may have a different design if we wanna solve this problem holistically, which may conflict with this patch. We should prove that this is in the right direction and future fix will not conflict with it, or come out with the final fix directly.

An example is, we may want to treat df.join(df, df("id") >= df("id")) as invalid in the final design.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Jun 3, 2018

I see what you mean. Honestly I have not thought of a full design for this problem (so I can't state what we should support and what not), but focusing on this specific case I think that:

  • at the moment we do support self-joins (at least in the case df.join(df, df("id") >= df("id"))) so considering this invalid would cause a big behavior change (potentially causing user workflows to break).
  • even though we might consider acceptable such a change in a major release, I think that we should support with the Dataframe API what we support in the SQL API, and SQL standard supports self joins (using aliases for the tables). So I do believe we should support this use case.
  • the case presented by @daniel-shields in [SPARK-24385][SQL] Resolve self-join condition ambiguity for all BinaryComparisons #21449 (comment), I think is a valid one without any doubt. As of now we are not supporting it, though.

So I think that in the holistic approach we shouldn't change the current behavior/approach which is present now and will be (IMHO) improved by this patch.

What I do think we have to discuss in order not to have to change it - once we want to solve the more generic issue - is the way to track the dataset an attribute is coming from. Here I decided to use the metadata, since I thought this is the cleanest approach. Another approach might be to introduce a new Option in the AttributeReference a reference to the dataset it is coming from.
For the generic solution, this might have the advantage that having a reference to the provenance dataset, where we might want to store some kind of DAG of the datasets this one is coming from in order to take more complex decision about the validity of the syntax and/or the resolution of the attribute.

What do you think?

@cloud-fan
Copy link
Contributor

This will definitely not go into 2.3.1, so we have plenty of time. I'll think deeper into it after the spark summit.

IMO df.join(df, df("id") >= df("id")) is ambiguous, especially when it's not an inner join.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Jun 4, 2018

Sure, thanks for your time.

PS df.join(df, df("id") >= df("id")) may be ambiguous, but in the example above
df1.join(df2, df2['id'].eqNullSafe(df1['id'])).collect() where df1 and df2 are created from the same dataframe it is not ambiguous IMHO.

@daniel-shields
Copy link

In the short term we should make the behavior of EqualTo and EqualNullSafe identical. We could do that by adding a case for EqualNullSafe that mirrors that of EqualTo.

@cloud-fan
Copy link
Contributor

In the short term we should make the behavior of EqualTo and EqualNullSafe identical.

This seems pretty safe and reasonable to me

@mgaido91
Copy link
Contributor Author

mgaido91 commented Jun 6, 2018

@daniel-shields do you want to open a PR for that? I'll leave this PR open as it is a more general fix so we can go on with the long-term discussion here in this PR. Do you agree with this approach @cloud-fan ?

@WenboZhao
Copy link
Contributor

I like the proposal by @daniel-shields. If we could get it fixed soon, we will be able to catch up the Spark 2.3.2 release.

@mgaido91
Copy link
Contributor Author

ok so I created #21605 for the fix proposed by @daniel-shields. I'd like to leave this open in order to go on with the discussion for a long-term better fix.

@mgaido91
Copy link
Contributor Author

@cloud-fan do you have any further comments about this? Thanks.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 11, 2020
@github-actions github-actions bot closed this Jan 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants