Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jun 7, 2020

What changes were proposed in this pull request?

This PR proposes to remove the useless projection in grouped and co-grouped UDFs, which can cause the analysis failure when the grouping column is specified with different upper-lower cases compared to the one specified in the return schema.

Currently, projection is initially added in grouped and cogrouped pandas UDFs to keep the grouping keys. For example,

from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
@pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()

adds a projection that includes the grouping keys:

== Parsed Logical Plan ==
'FlatMapGroupsInPandas ['COLUMN], my_pandas_udf(COLUMN#166L, Score#167L), [COLUMN#193, Score#194]
+- 'Project ['COLUMN, column#166L, Score#167L]  # <---- Here
...

which later causes the reference resolution failure:

pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;"

In fact, we don't need to add the grouping keys at all because grouped and co-grouped pandas UDFs always require to take all columns as input pandas UDF.

After this fix, it will be as below:

== Parsed Logical Plan ==
'FlatMapGroupsInPandas ['COLUMN], my_pandas_udf(column#0L, Score#1L), [column#9, Score#10]
+- LogicalRDD [column#0L, Score#1L], false

Note that we'll still add projection in case of non-deterministic grouping expressions to avoid to evaluate it multiple times which ends up with a different result.

Why are the changes needed?

This change will fix two things:

  • Implementation consistency w.t.r other grouping expressions.
  • A bug related to the case sensitivity, see below.

Does this PR introduce any user-facing change?

Yes,

from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
@pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

df1.groupby("COLUMN").cogroup(
    df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()

Before:

pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false

After:

+------+-----+
|column|Score|
+------+-----+
|     1|  0.5|
+------+-----+
+------+-----+
|column|value|
+------+-----+
|     2|    2|
+------+-----+

How was this patch tested?

def fromExpression(expr: Expression): NamedExpression = expr match {
case ne: NamedExpression => ne
case _: Expression => Alias(expr, toPrettySQL(expr))()
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will send another PR to use this in other places, for example,

val namedDistinctExpressions = distinctExpressions.map {
case ne: NamedExpression => ne
case other => Alias(other, other.toString)()
}

val aliasedExprs = aggregateExprs.map {
case ne: NamedExpression => ne
case e => Alias(e, e.toString)()
}

private[this] def alias(expr: Expression): NamedExpression = expr match {
case u: UnresolvedAttribute => UnresolvedAlias(u)
case expr: NamedExpression => expr
case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>
UnresolvedAlias(a, Some(Column.generateAlias))
case expr: Expression => Alias(expr, toPrettySQL(expr))()
}

and possibly at:

case expr: Expression => Alias(expr, toPrettySQL(expr))()

I can don't add this util here for now too if anyone is not sure on this.

Copy link
Contributor

@TJX2014 TJX2014 Jun 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find org.apache.spark.sql.Dataset#groupBy(cols: Column*) is invoked through py4j instead of groupBy(col1: String, cols: String*), is it possible to change param sent in python side only to invoke groupBy(col1: String, cols: String*), which may also be helpful to this jira :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let me take a look separate with a separate JIRA.

@SparkQA
Copy link

SparkQA commented Jun 7, 2020

Test build #123598 has finished for PR 28745 at commit 9522493.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 7, 2020

Test build #123599 has finished for PR 28745 at commit 2800eb2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}.copy(child = newChild)
projectGroupingExprs(a, a.groupingExpressions)

case f: FlatMapGroupsInPandas if f.groupingExprs.exists(!_.deterministic) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, basically this is for the case when grouping expressions are non-deterministic:

== Physical Plan ==
FlatMapGroupsInPandas [_nondeterministic#14], my_pandas_udf(column#4L, score#6), [column#12, score#13]
+- *(2) Sort [_nondeterministic#14 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(_nondeterministic#14, 200), true, [id=#19]
      +- *(1) Project [column#4L, score#6, rand(42) AS _nondeterministic#14]  # <--- here to evaluate non-deterministic expression only once.
...

@cloud-fan
Copy link
Contributor

will this cause perf regression? e.g. if grouping expr is expensive, with the Project we only need to evaluate it once.

@SparkQA
Copy link

SparkQA commented Jun 9, 2020

Test build #123698 has finished for PR 28745 at commit 2bf09c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 10, 2020

will this cause perf regression? e.g. if grouping expr is expensive, with the Project we only need to evaluate it once.

I would say this is kind of a design choice. In the other way, we should add the projection to all grouping expressions, and will need to keep more data intermediately on the other hand.

This PR matches the implementation with existing grouping expressions - it shouldn't be matched with object expressions because grouped and cogrouped UDFs actually should pass a key separately to UDF to use which object expressions don't.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 10, 2020

BTW, I skimmed Aggregate related analysis and I think we're all good with the current change if I didn't miss anything.

@HyukjinKwon
Copy link
Member Author

Okay, let me try to just workaround the problem alone .. it's too invasive ..

BryanCutler pushed a commit that referenced this pull request Jun 10, 2020
…he case sensitivity in grouped and cogrouped pandas UDFs

### What changes were proposed in this pull request?

This is another approach to fix the issue. See the previous try #28745. It was too invasive so I took more conservative approach.

This PR proposes to resolve grouping attributes separately first so it can be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` are resolved without ambiguity.

Previously,

```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
```

was failed as below:

```
pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;"
```
because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know which reference to take from the child projection.

After this fix, it resolves the child projection first with grouping keys and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the child projection that is positionally selected.

### Why are the changes needed?

To resolve grouping keys correctly.

### Does this PR introduce _any_ user-facing change?

Yes,

```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
```

```python
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

df1.groupby("COLUMN").cogroup(
    df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()
```

Before:

```
pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
```

```
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false
```

After:

```
+------+-----+
|column|Score|
+------+-----+
|     1|  0.5|
+------+-----+
```

```
+------+-----+
|column|value|
+------+-----+
|     2|    2|
+------+-----+
```

### How was this patch tested?

Unittests were added and manually tested.

Closes #28777 from HyukjinKwon/SPARK-31915-another.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: Bryan Cutler <[email protected]>
HyukjinKwon added a commit that referenced this pull request Jun 11, 2020
…he case sensitivity in grouped and cogrouped pandas UDFs

### What changes were proposed in this pull request?

This is another approach to fix the issue. See the previous try #28745. It was too invasive so I took more conservative approach.

This PR proposes to resolve grouping attributes separately first so it can be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` are resolved without ambiguity.

Previously,

```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
```

was failed as below:

```
pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;"
```
because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know which reference to take from the child projection.

After this fix, it resolves the child projection first with grouping keys and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the child projection that is positionally selected.

### Why are the changes needed?

To resolve grouping keys correctly.

### Does this PR introduce _any_ user-facing change?

Yes,

```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
```

```python
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

df1.groupby("COLUMN").cogroup(
    df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()
```

Before:

```
pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
```

```
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false
```

After:

```
+------+-----+
|column|Score|
+------+-----+
|     1|  0.5|
+------+-----+
```

```
+------+-----+
|column|value|
+------+-----+
|     2|    2|
+------+-----+
```

### How was this patch tested?

Unittests were added and manually tested.

Closes #28777 from HyukjinKwon/SPARK-31915-another.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: Bryan Cutler <[email protected]>
holdenk pushed a commit to holdenk/spark that referenced this pull request Jun 25, 2020
…he case sensitivity in grouped and cogrouped pandas UDFs

### What changes were proposed in this pull request?

This is another approach to fix the issue. See the previous try apache#28745. It was too invasive so I took more conservative approach.

This PR proposes to resolve grouping attributes separately first so it can be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` are resolved without ambiguity.

Previously,

```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
```

was failed as below:

```
pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;"
```
because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know which reference to take from the child projection.

After this fix, it resolves the child projection first with grouping keys and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the child projection that is positionally selected.

### Why are the changes needed?

To resolve grouping keys correctly.

### Does this PR introduce _any_ user-facing change?

Yes,

```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
    return pdf.assign(Score=0.5)

df.groupby('COLUMN').apply(my_pandas_udf).show()
```

```python
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

df1.groupby("COLUMN").cogroup(
    df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()
```

Before:

```
pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
```

```
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
:  +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
   +- LogicalRDD [column#13L, value#14L], false
```

After:

```
+------+-----+
|column|Score|
+------+-----+
|     1|  0.5|
+------+-----+
```

```
+------+-----+
|column|value|
+------+-----+
|     2|    2|
+------+-----+
```

### How was this patch tested?

Unittests were added and manually tested.

Closes apache#28777 from HyukjinKwon/SPARK-31915-another.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: Bryan Cutler <[email protected]>
@HyukjinKwon HyukjinKwon deleted the SPARK-31915 branch July 27, 2020 07:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants