From 034037a189c697f1fc8966f72fad03b56b05fde9 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 1 Sep 2023 18:49:35 +0800 Subject: [PATCH 1/2] init init --- python/pyspark/sql/connect/dataframe.py | 4 --- python/pyspark/sql/dataframe.py | 35 +++++++++++++++++-------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index c42de589f8de..655b7eddc3a4 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -2184,10 +2184,6 @@ def _test() -> None: os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.connect.dataframe.__dict__.copy() - # Spark Connect does not support RDD but the tests depend on them. - del pyspark.sql.connect.dataframe.DataFrame.coalesce.__doc__ - del pyspark.sql.connect.dataframe.DataFrame.repartition.__doc__ - del pyspark.sql.connect.dataframe.DataFrame.repartitionByRange.__doc__ # TODO(SPARK-41625): Support Structured Streaming del pyspark.sql.connect.dataframe.DataFrame.isStreaming.__doc__ diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 64592311a132..5d9bfc919df5 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1758,9 +1758,10 @@ def coalesce(self, numPartitions: int) -> "DataFrame": Examples -------- - >>> df = spark.range(10) - >>> df.coalesce(1).rdd.getNumPartitions() - 1 + >>> spark.range(10).coalesce(1).explain() + == Physical Plan == + Coalesce 1 + +- ... Range (0, 10, step=1, splits=...) """ return DataFrame(self._jdf.coalesce(numPartitions), self.sparkSession) @@ -1809,18 +1810,27 @@ def repartition( # type: ignore[misc] Repartition the data into 10 partitions. - >>> df.repartition(10).rdd.getNumPartitions() - 10 + >>> df.repartition(10).explain() + == Physical Plan == + AdaptiveSparkPlan isFinalPlan=false + +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, ... + ... Repartition the data into 7 partitions by 'age' column. - >>> df.repartition(7, "age").rdd.getNumPartitions() - 7 + >>> df.repartition(7, "age").explain() + == Physical Plan == + AdaptiveSparkPlan isFinalPlan=false + +- Exchange hashpartitioning(age..., 7), REPARTITION_BY_NUM, ... + ... Repartition the data into 7 partitions by 'age' and 'name columns. - >>> df.repartition(3, "name", "age").rdd.getNumPartitions() - 3 + >>> df.repartition(3, "name", "age").explain() + == Physical Plan == + AdaptiveSparkPlan isFinalPlan=false + +- Exchange hashpartitioning(name..., age..., 3), REPARTITION_BY_NUM, ... + ... """ if isinstance(numPartitions, int): if len(cols) == 0: @@ -1895,8 +1905,11 @@ def repartitionByRange( # type: ignore[misc] For example, the first partition can have ``(14, "Tom")``, and the second partition would have ``(16, "Bob")`` and ``(23, "Alice")``. - >>> df.repartitionByRange(2, "age").rdd.getNumPartitions() - 2 + >>> df.repartitionByRange(2, "age").explain() + == Physical Plan == + AdaptiveSparkPlan isFinalPlan=false + +- Exchange rangepartitioning(age... ASC NULLS FIRST, 2), REPARTITION_BY_NUM, ... + ... """ if isinstance(numPartitions, int): if len(cols) == 0: From fad313ad70074d4d691508b70dbfb9e27e2f0d4a Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Mon, 4 Sep 2023 18:18:31 +0800 Subject: [PATCH 2/2] address comments --- python/pyspark/sql/dataframe.py | 130 ++++++++++++++++++++++++-------- 1 file changed, 99 insertions(+), 31 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5d9bfc919df5..1302be961ce2 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1758,10 +1758,27 @@ def coalesce(self, numPartitions: int) -> "DataFrame": Examples -------- - >>> spark.range(10).coalesce(1).explain() - == Physical Plan == - Coalesce 1 - +- ... Range (0, 10, step=1, splits=...) + >>> from pyspark.sql import functions as sf + >>> spark.range(0, 10, 1, 3).select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + | 1| + | 2| + +---------+ + + >>> from pyspark.sql import functions as sf + >>> spark.range(0, 10, 1, 3).coalesce(1).select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + +---------+ """ return DataFrame(self._jdf.coalesce(numPartitions), self.sparkSession) @@ -1805,32 +1822,78 @@ def repartition( # type: ignore[misc] Examples -------- - >>> df = spark.createDataFrame( - ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + >>> from pyspark.sql import functions as sf + >>> df = spark.range(0, 64, 1, 9).withColumn( + ... "name", sf.concat(sf.lit("name_"), sf.col("id").cast("string")) + ... ).withColumn( + ... "age", sf.col("id") - 32 + ... ) + >>> df.select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + | 1| + | 2| + | 3| + | 4| + | 5| + | 6| + | 7| + | 8| + +---------+ Repartition the data into 10 partitions. - >>> df.repartition(10).explain() - == Physical Plan == - AdaptiveSparkPlan isFinalPlan=false - +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, ... - ... + >>> df.repartition(10).select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + | 1| + | 2| + | 3| + | 4| + | 5| + | 6| + | 7| + | 8| + | 9| + +---------+ Repartition the data into 7 partitions by 'age' column. - >>> df.repartition(7, "age").explain() - == Physical Plan == - AdaptiveSparkPlan isFinalPlan=false - +- Exchange hashpartitioning(age..., 7), REPARTITION_BY_NUM, ... - ... + >>> df.repartition(7, "age").select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + | 1| + | 2| + | 3| + | 4| + | 5| + | 6| + +---------+ Repartition the data into 7 partitions by 'age' and 'name columns. - >>> df.repartition(3, "name", "age").explain() - == Physical Plan == - AdaptiveSparkPlan isFinalPlan=false - +- Exchange hashpartitioning(name..., age..., 3), REPARTITION_BY_NUM, ... - ... + >>> df.repartition(3, "name", "age").select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + | 1| + | 2| + +---------+ """ if isinstance(numPartitions, int): if len(cols) == 0: @@ -1898,18 +1961,23 @@ def repartitionByRange( # type: ignore[misc] Examples -------- - >>> df = spark.createDataFrame( - ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) - Repartition the data into 2 partitions by range in 'age' column. - For example, the first partition can have ``(14, "Tom")``, and the second - partition would have ``(16, "Bob")`` and ``(23, "Alice")``. + For example, the first partition can have ``(14, "Tom")`` and ``(16, "Bob")``, + and the second partition would have ``(23, "Alice")``. - >>> df.repartitionByRange(2, "age").explain() - == Physical Plan == - AdaptiveSparkPlan isFinalPlan=false - +- Exchange rangepartitioning(age... ASC NULLS FIRST, 2), REPARTITION_BY_NUM, ... - ... + >>> from pyspark.sql import functions as sf + >>> spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"] + ... ).repartitionByRange(2, "age").select( + ... "age", "name", sf.spark_partition_id() + ... ).show() + +---+-----+--------------------+ + |age| name|SPARK_PARTITION_ID()| + +---+-----+--------------------+ + | 14| Tom| 0| + | 16| Bob| 0| + | 23|Alice| 1| + +---+-----+--------------------+ """ if isinstance(numPartitions, int): if len(cols) == 0: