Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2184,10 +2184,6 @@ def _test() -> None:
os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.sql.connect.dataframe.__dict__.copy()
# Spark Connect does not support RDD but the tests depend on them.
del pyspark.sql.connect.dataframe.DataFrame.coalesce.__doc__
del pyspark.sql.connect.dataframe.DataFrame.repartition.__doc__
del pyspark.sql.connect.dataframe.DataFrame.repartitionByRange.__doc__

# TODO(SPARK-41625): Support Structured Streaming
del pyspark.sql.connect.dataframe.DataFrame.isStreaming.__doc__
Expand Down
117 changes: 99 additions & 18 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1758,9 +1758,27 @@ def coalesce(self, numPartitions: int) -> "DataFrame":

Examples
--------
>>> df = spark.range(10)
>>> df.coalesce(1).rdd.getNumPartitions()
1
>>> from pyspark.sql import functions as sf
>>> spark.range(0, 10, 1, 3).select(
... sf.spark_partition_id().alias("partition")
... ).distinct().sort("partition").show()
+---------+
|partition|
+---------+
| 0|
| 1|
| 2|
+---------+

>>> from pyspark.sql import functions as sf
>>> spark.range(0, 10, 1, 3).coalesce(1).select(
... sf.spark_partition_id().alias("partition")
... ).distinct().sort("partition").show()
+---------+
|partition|
+---------+
| 0|
+---------+
"""
return DataFrame(self._jdf.coalesce(numPartitions), self.sparkSession)

Expand Down Expand Up @@ -1804,23 +1822,78 @@ def repartition( # type: ignore[misc]

Examples
--------
>>> df = spark.createDataFrame(
... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
>>> from pyspark.sql import functions as sf
>>> df = spark.range(0, 64, 1, 9).withColumn(
... "name", sf.concat(sf.lit("name_"), sf.col("id").cast("string"))
... ).withColumn(
... "age", sf.col("id") - 32
... )
>>> df.select(
... sf.spark_partition_id().alias("partition")
... ).distinct().sort("partition").show()
+---------+
|partition|
+---------+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
+---------+

Repartition the data into 10 partitions.

>>> df.repartition(10).rdd.getNumPartitions()
10
>>> df.repartition(10).select(
... sf.spark_partition_id().alias("partition")
... ).distinct().sort("partition").show()
+---------+
|partition|
+---------+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---------+

Repartition the data into 7 partitions by 'age' column.

>>> df.repartition(7, "age").rdd.getNumPartitions()
7
>>> df.repartition(7, "age").select(
... sf.spark_partition_id().alias("partition")
... ).distinct().sort("partition").show()
+---------+
|partition|
+---------+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
+---------+

Repartition the data into 7 partitions by 'age' and 'name columns.

>>> df.repartition(3, "name", "age").rdd.getNumPartitions()
3
>>> df.repartition(3, "name", "age").select(
... sf.spark_partition_id().alias("partition")
... ).distinct().sort("partition").show()
+---------+
|partition|
+---------+
| 0|
| 1|
| 2|
+---------+
"""
if isinstance(numPartitions, int):
if len(cols) == 0:
Expand Down Expand Up @@ -1888,15 +1961,23 @@ def repartitionByRange( # type: ignore[misc]

Examples
--------
>>> df = spark.createDataFrame(
... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])

Repartition the data into 2 partitions by range in 'age' column.
For example, the first partition can have ``(14, "Tom")``, and the second
partition would have ``(16, "Bob")`` and ``(23, "Alice")``.
For example, the first partition can have ``(14, "Tom")`` and ``(16, "Bob")``,
and the second partition would have ``(23, "Alice")``.

>>> df.repartitionByRange(2, "age").rdd.getNumPartitions()
2
>>> from pyspark.sql import functions as sf
>>> spark.createDataFrame(
... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]
... ).repartitionByRange(2, "age").select(
... "age", "name", sf.spark_partition_id()
... ).show()
+---+-----+--------------------+
|age| name|SPARK_PARTITION_ID()|
+---+-----+--------------------+
| 14| Tom| 0|
| 16| Bob| 0|
| 23|Alice| 1|
+---+-----+--------------------+
"""
if isinstance(numPartitions, int):
if len(cols) == 0:
Expand Down