Skip to content

Conversation

@MaxGekk
Copy link
Collaborator

@MaxGekk MaxGekk commented Apr 29, 2020

What changes were proposed in this pull request?

The results are generated on

Item Description
Region us-west-2 (Oregon)
Instance r3.xlarge
AMI ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5)
Java OpenJDK 64-Bit Server VM 1.8.0_242 and OpenJDK 64-Bit Server VM 11.0.6+10

How was this patch tested?

By running the benchmark via:

SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.DateTimeRebaseBenchmark"

@cloud-fan cloud-fan merged commit 5a3009b into cloud-fan:perf Apr 30, 2020
@MaxGekk MaxGekk deleted the spark-31606-cloud-fan-perf2 branch June 5, 2020 19:48
cloud-fan pushed a commit that referenced this pull request Apr 24, 2023
…onnect

### What changes were proposed in this pull request?
Implement Arrow-optimized Python UDFs in Spark Connect.

Please see apache#39384 for motivation and  performance improvements of Arrow-optimized Python UDFs.

### Why are the changes needed?
Parity with vanilla PySpark.

### Does this PR introduce _any_ user-facing change?
Yes. In Spark Connect Python Client, users can:

1. Set `useArrow` parameter True to enable Arrow optimization for a specific Python UDF.

```sh
>>> df = spark.range(2)
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).show()
+------------+
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#18 AS <lambda>(id)#16]
+- ArrowEvalPython [<lambda>(id#14L)#15], [pythonUDF0#18], 200
   +- *(1) Range (0, 2, step=1, splits=1)
```

2. Enable `spark.sql.execution.pythonUDF.arrow.enabled` Spark Conf to make all Python UDFs Arrow-optimized.

```sh
>>> spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
>>> df.select(udf(lambda x : x + 1)('id')).show()
+------------+
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#30 AS <lambda>(id)apache#28]
+- ArrowEvalPython [<lambda>(id#26L)apache#27], [pythonUDF0#30], 200
   +- *(1) Range (0, 2, step=1, splits=1)

```

### How was this patch tested?
Parity unit tests.

Closes apache#40725 from xinrong-meng/connect_arrow_py_udf.

Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
cloud-fan pushed a commit that referenced this pull request Jul 9, 2025
…pressions in `buildAggExprList`

### What changes were proposed in this pull request?

Trim aliases before matching Sort/Having/Filter expressions with semantically equal expression from the Aggregate below in `buildAggExprList`

### Why are the changes needed?
For a query like:
```
SELECT course, year, GROUPING(course) FROM courseSales GROUP BY CUBE(course, year) ORDER BY GROUPING(course)
```

Plan after `ResolveReferences` and before `ResolveAggregateFunctions` looks like:

```
!Sort [cast((shiftright(tempresolvedcolumn(spark_grouping_id#18L, spark_grouping_id, false), 1) & 1) as tinyint) AS grouping(course)#22 ASC NULLS FIRST], true
 +- Aggregate [course#19, year#20, spark_grouping_id#18L], [course#19, year#20, cast((shiftright(spark_grouping_id#18L, 1) & 1) as tinyint) AS grouping(course)#21 AS grouping(course)#15]
....
```
Because aggregate list has `Alias(Alias(cast((shiftright(spark_grouping_id#18L, 1) & 1) as tinyint))` expression from `SortOrder` won't get matched as semantically equal and it will result in adding an unnecessary `Project`. By stripping inner aliases from aggregate list (that are going to get removed anyways in `CleanupAliases`) we can match `SortOrder` expression and resolve it as `grouping(course)#15`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51339 from mihailotim-db/mihailotim-db/fix_inner_aliases_semi_structured.

Authored-by: Mihailo Timotic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Aug 19, 2025
…onicalized expressions

### What changes were proposed in this pull request?

Make PullOutNonDeterministic use canonicalized expressions to dedup group and  aggregate expressions. This affects pyspark udfs in particular. Example:

```
from pyspark.sql.functions import col, avg, udf

pythonUDF = udf(lambda x: x).asNondeterministic()

spark.range(10)\
.selectExpr("id", "id % 3 as value")\
.groupBy(pythonUDF(col("value")))\
.agg(avg("id"), pythonUDF(col("value")))\
.explain(extended=True)
```

Currently results in a plan like this:

```
Aggregate [_nondeterministic#15](#15), [_nondeterministic#15 AS dummyNondeterministicUDF(value)#12, avg(id#0L) AS avg(id)#13, dummyNondeterministicUDF(value#6L)#8 AS dummyNondeterministicUDF(value)#14](#15%20AS%20dummyNondeterministicUDF(value)#12,%20avg(id#0L)%20AS%20avg(id)#13,%20dummyNondeterministicUDF(value#6L)#8%20AS%20dummyNondeterministicUDF(value)#14)
+- Project [id#0L, value#6L, dummyNondeterministicUDF(value#6L)#7 AS _nondeterministic#15](#0L,%20value#6L,%20dummyNondeterministicUDF(value#6L)#7%20AS%20_nondeterministic#15)
   +- Project [id#0L, (id#0L % cast(3 as bigint)) AS value#6L](#0L,%20(id#0L%20%%20cast(3%20as%20bigint))%20AS%20value#6L)
      +- Range (0, 10, step=1, splits=Some(2))
```

and then it throws:

```
[[MISSING_AGGREGATION] The non-aggregating expression "value" is based on columns which are not participating in the GROUP BY clause. Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(value)" if you do not care which of the values within a group is returned. SQLSTATE: 42803
```

- how canonicalized fixes this:
  -  nondeterministic PythonUDF expressions always have distinct resultIds per udf
  - The fix is to canonicalize the expressions when matching. Canonicalized means that we're setting the resultIds to -1, allowing us to dedup the PythonUDF expressions.
- for deterministic UDFs, this rule does not apply and "Post Analysis" batch extracts and deduplicates the expressions, as expected

### Why are the changes needed?

- the output of the query with the fix applied still makes sense - the nondeterministic UDF is invoked only once, in the project.

### Does this PR introduce _any_ user-facing change?

Yes, it's additive, it enables queries to run that previously threw errors.

### How was this patch tested?

- added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52061 from benrobby/adhoc-fix-pull-out-nondeterministic.

Authored-by: Ben Hurdelhey <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants