From 26f3089cc551659c5b148828e82336022ff48aa7 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 26 Jan 2023 10:43:24 +0900 Subject: [PATCH 1/2] [SPARK-42168][3.2][SQL][PYTHON] Fix required child distribution of FlatMapCoGroupsInPandas (as in CoGroup) Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala). This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`. The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2. ```Scala import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{col, lit, sum} val ids = 1000 val days = 1000 val parts = 10 val id_df = spark.range(ids) val day_df = spark.range(days).withColumnRenamed("id", "day") val id_day_df = id_df.join(day_df) // these redundant aliases are needed to workaround bug SPARK-42132 val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache() val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache() //.withColumnRenamed("id", "id2") // note the column order is different to the groupBy("id", "day") column order below val window = Window.partitionBy("day", "id") case class Key(id: BigInt, day: BigInt) case class Value(id: BigInt, day: BigInt, side: String) case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt) val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value] val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum] val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left) df.explain() df.show(5) ``` Output was ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 3| 0| 1| | 0| 4| 0| 1| | 0| 13| 1| 0| | 0| 27| 0| 1| | 0| 31| 0| 1| +---+---+-----+------+ only showing top 5 rows ``` Output now is ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118] +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 13| 1| 1| | 0| 63| 1| 1| | 0| 89| 1| 1| | 0| 95| 1| 1| | 0| 96| 1| 1| +---+---+-----+------+ only showing top 5 rows ``` Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: #32875 This fixes correctness. A unit test in `EnsureRequirementsSuite`. Closes #39717 from EnricoMi/branch-3.2-cogroup-window-bug. Authored-by: Enrico Minack Signed-off-by: Hyukjin Kwon --- .../tests/pandas/test_pandas_cogrouped_map.py | 47 +++++++++++++++- .../exchange/EnsureRequirementsSuite.scala | 56 +++++++++++++++++++ 2 files changed, 102 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py index d92a105f5d4b..93188a769a73 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py @@ -18,8 +18,9 @@ import unittest from typing import cast -from pyspark.sql.functions import array, explode, col, lit, udf, pandas_udf +from pyspark.sql.functions import array, explode, col, lit, udf, pandas_udf, sum from pyspark.sql.types import DoubleType, StructType, StructField, Row +from pyspark.sql.window import Window from pyspark.errors import IllegalArgumentException, PythonException from pyspark.testing.sqlutils import ( ReusedSQLTestCase, @@ -365,6 +366,50 @@ def test_self_join(self): self.assertEqual(row.asDict(), Row(column=2, value=2).asDict()) + def test_with_window_function(self): + # SPARK-42168: a window function with same partition keys but differing key order + ids = 2 + days = 100 + vals = 10000 + parts = 10 + + id_df = self.spark.range(ids) + day_df = self.spark.range(days).withColumnRenamed("id", "day") + vals_df = self.spark.range(vals).withColumnRenamed("id", "value") + df = id_df.join(day_df).join(vals_df) + + left_df = df.withColumnRenamed("value", "left").repartition(parts).cache() + # SPARK-42132: this bug requires us to alias all columns from df here + right_df = df.select( + col("id").alias("id"), col("day").alias("day"), col("value").alias("right") + ).repartition(parts).cache() + + # note the column order is different to the groupBy("id", "day") column order below + window = Window.partitionBy("day", "id") + + left_grouped_df = left_df.groupBy("id", "day") + right_grouped_df = right_df \ + .withColumn("day_sum", sum(col("day")).over(window)) \ + .groupBy("id", "day") + + def cogroup(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: + return pd.DataFrame([{ + "id": left["id"][0] if not left.empty else ( + right["id"][0] if not right.empty else None + ), + "day": left["day"][0] if not left.empty else ( + right["day"][0] if not right.empty else None + ), + "lefts": len(left.index), + "rights": len(right.index) + }]) + + df = left_grouped_df.cogroup(right_grouped_df) \ + .applyInPandas(cogroup, schema="id long, day long, lefts integer, rights integer") + + actual = df.orderBy("id", "day").take(days) + self.assertEqual(actual, [Row(0, day, vals, vals) for day in range(days)]) + @staticmethod def _test_with_key(left, right, isLeft): def right_assign_key(key, lft, rgt): diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala index bc1fd7a5fa5b..844037339ab9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.execution.exchange +import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.Sum import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.statsEstimation.StatsTestPlan @@ -25,9 +27,12 @@ import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.execution.{DummySparkPlan, SortExec} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.SortMergeJoinExec +import org.apache.spark.sql.execution.python.FlatMapCoGroupsInPandasExec +import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} class EnsureRequirementsSuite extends SharedSparkSession { private val exprA = Literal(1) @@ -1104,6 +1109,57 @@ class EnsureRequirementsSuite extends SharedSparkSession { } } + test("SPARK-42168: FlatMapCoGroupInPandas and Window function with differing key order") { + val lKey = AttributeReference("key", IntegerType)() + val lKey2 = AttributeReference("key2", IntegerType)() + + val rKey = AttributeReference("key", IntegerType)() + val rKey2 = AttributeReference("key2", IntegerType)() + val rValue = AttributeReference("value", IntegerType)() + + val left = DummySparkPlan() + val right = WindowExec( + Alias( + WindowExpression( + Sum(rValue).toAggregateExpression(), + WindowSpecDefinition( + Seq(rKey2, rKey), + Nil, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) + ) + ), "sum")() :: Nil, + Seq(rKey2, rKey), + Nil, + DummySparkPlan() + ) + + val pythonUdf = PythonUDF("pyUDF", null, + StructType(Seq(StructField("value", IntegerType))), + Seq.empty, + PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF, + true) + + val flapMapCoGroup = FlatMapCoGroupsInPandasExec( + Seq(lKey, lKey2), + Seq(rKey, rKey2), + pythonUdf, + AttributeReference("value", IntegerType)() :: Nil, + left, + right + ) + + val result = EnsureRequirements.apply(flapMapCoGroup) + result match { + case FlatMapCoGroupsInPandasExec(leftKeys, rightKeys, _, _, + SortExec(leftOrder, false, _, _), SortExec(rightOrder, false, _, _)) => + assert(leftKeys === Seq(lKey, lKey2)) + assert(rightKeys === Seq(rKey, rKey2)) + assert(leftKeys.map(k => SortOrder(k, Ascending)) === leftOrder) + assert(rightKeys.map(k => SortOrder(k, Ascending)) === rightOrder) + case other => fail(other.toString) + } + } + def bucket(numBuckets: Int, expr: Expression): TransformExpression = { TransformExpression(BucketFunction, Seq(expr), Some(numBuckets)) } From e26ab152ce6d76ba2d8e1b6229e1c736aad65341 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 27 Jan 2023 07:49:42 +0100 Subject: [PATCH 2/2] Reformat pythoon --- .../tests/pandas/test_pandas_cogrouped_map.py | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py index 93188a769a73..5cbc9e1caa43 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py @@ -380,32 +380,39 @@ def test_with_window_function(self): left_df = df.withColumnRenamed("value", "left").repartition(parts).cache() # SPARK-42132: this bug requires us to alias all columns from df here - right_df = df.select( - col("id").alias("id"), col("day").alias("day"), col("value").alias("right") - ).repartition(parts).cache() + right_df = ( + df.select(col("id").alias("id"), col("day").alias("day"), col("value").alias("right")) + .repartition(parts) + .cache() + ) # note the column order is different to the groupBy("id", "day") column order below window = Window.partitionBy("day", "id") left_grouped_df = left_df.groupBy("id", "day") - right_grouped_df = right_df \ - .withColumn("day_sum", sum(col("day")).over(window)) \ - .groupBy("id", "day") + right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy( + "id", "day" + ) def cogroup(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: - return pd.DataFrame([{ - "id": left["id"][0] if not left.empty else ( - right["id"][0] if not right.empty else None - ), - "day": left["day"][0] if not left.empty else ( - right["day"][0] if not right.empty else None - ), - "lefts": len(left.index), - "rights": len(right.index) - }]) - - df = left_grouped_df.cogroup(right_grouped_df) \ - .applyInPandas(cogroup, schema="id long, day long, lefts integer, rights integer") + return pd.DataFrame( + [ + { + "id": left["id"][0] + if not left.empty + else (right["id"][0] if not right.empty else None), + "day": left["day"][0] + if not left.empty + else (right["day"][0] if not right.empty else None), + "lefts": len(left.index), + "rights": len(right.index), + } + ] + ) + + df = left_grouped_df.cogroup(right_grouped_df).applyInPandas( + cogroup, schema="id long, day long, lefts integer, rights integer" + ) actual = df.orderBy("id", "day").take(days) self.assertEqual(actual, [Row(0, day, vals, vals) for day in range(days)])