Skip to content

Commit c294943

Browse files
d80tb7HyukjinKwon
authored andcommitted
[SPARK-29126][PYSPARK][DOC] Pandas Cogroup udf usage guide
This PR adds some extra documentation for the new Cogrouped map Pandas udfs. Specifically: - Updated the usage guide for the new `COGROUPED_MAP` Pandas udfs added in #24981 - Updated the docstring for pandas_udf to include the COGROUPED_MAP type as suggested by HyukjinKwon in #25939 Closes #26110 from d80tb7/SPARK-29126-cogroup-udf-usage-guide. Authored-by: Chris Martin <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 3206a99 commit c294943

File tree

3 files changed

+119
-0
lines changed

3 files changed

+119
-0
lines changed

docs/sql-pyspark-pandas-with-arrow.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,41 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p
178178
[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
179179

180180

181+
### Cogrouped Map
182+
183+
Cogrouped map Pandas UDFs allow two DataFrames to be cogrouped by a common key and then a python function applied to
184+
each cogroup. They are used with `groupBy().cogroup().apply()` which consists of the following steps:
185+
186+
* Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
187+
* Apply a function to each cogroup. The input of the function is two `pandas.DataFrame` (with an optional Tuple
188+
representing the key). The output of the function is a `pandas.DataFrame`.
189+
* Combine the pandas.DataFrames from all groups into a new `DataFrame`.
190+
191+
To use `groupBy().cogroup().apply()`, the user needs to define the following:
192+
* A Python function that defines the computation for each cogroup.
193+
* A `StructType` object or a string that defines the schema of the output `DataFrame`.
194+
195+
The column labels of the returned `pandas.DataFrame` must either match the field names in the
196+
defined output schema if specified as strings, or match the field data types by position if not
197+
strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
198+
on how to label columns when constructing a `pandas.DataFrame`.
199+
200+
Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of
201+
memory exceptions, especially if the group sizes are skewed. The configuration for [maxRecordsPerBatch](#setting-arrow-batch-size)
202+
is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.
203+
204+
The following example shows how to use `groupby().cogroup().apply()` to perform an asof join between two datasets.
205+
206+
<div class="codetabs">
207+
<div data-lang="python" markdown="1">
208+
{% include_example cogrouped_map_pandas_udf python/sql/arrow.py %}
209+
</div>
210+
</div>
211+
212+
For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and
213+
[`pyspark.sql.CoGroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.CoGroupedData.apply).
214+
215+
181216
## Usage Notes
182217

183218
### Supported SQL Types

examples/src/main/python/sql/arrow.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,36 @@ def filter_func(batch_iter):
258258
# $example off:map_iter_pandas_udf$
259259

260260

261+
def cogrouped_map_pandas_udf_example(spark):
262+
# $example on:cogrouped_map_pandas_udf$
263+
import pandas as pd
264+
265+
from pyspark.sql.functions import pandas_udf, PandasUDFType
266+
267+
df1 = spark.createDataFrame(
268+
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
269+
("time", "id", "v1"))
270+
271+
df2 = spark.createDataFrame(
272+
[(20000101, 1, "x"), (20000101, 2, "y")],
273+
("time", "id", "v2"))
274+
275+
@pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
276+
def asof_join(l, r):
277+
return pd.merge_asof(l, r, on="time", by="id")
278+
279+
df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()
280+
# +--------+---+---+---+
281+
# | time| id| v1| v2|
282+
# +--------+---+---+---+
283+
# |20000101| 1|1.0| x|
284+
# |20000102| 1|3.0| x|
285+
# |20000101| 2|2.0| y|
286+
# |20000102| 2|4.0| y|
287+
# +--------+---+---+---+
288+
# $example off:cogrouped_map_pandas_udf$
289+
290+
261291
if __name__ == "__main__":
262292
spark = SparkSession \
263293
.builder \
@@ -276,5 +306,7 @@ def filter_func(batch_iter):
276306
grouped_agg_pandas_udf_example(spark)
277307
print("Running pandas_udf map iterator example")
278308
map_iter_pandas_udf_example(spark)
309+
print("Running pandas_udf cogrouped map example")
310+
cogrouped_map_pandas_udf_example(spark)
279311

280312
spark.stop()

python/pyspark/sql/functions.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3236,6 +3236,58 @@ def pandas_udf(f=None, returnType=None, functionType=None):
32363236
| 1| 21|
32373237
+---+---+
32383238
3239+
6. COGROUPED_MAP
3240+
3241+
A cogrouped map UDF defines transformation: (`pandas.DataFrame`, `pandas.DataFrame`) ->
3242+
`pandas.DataFrame`. The `returnType` should be a :class:`StructType` describing the schema
3243+
of the returned `pandas.DataFrame`. The column labels of the returned `pandas.DataFrame`
3244+
must either match the field names in the defined `returnType` schema if specified as strings,
3245+
or match the field data types by position if not strings, e.g. integer indices. The length
3246+
of the returned `pandas.DataFrame` can be arbitrary.
3247+
3248+
CoGrouped map UDFs are used with :meth:`pyspark.sql.CoGroupedData.apply`.
3249+
3250+
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
3251+
>>> df1 = spark.createDataFrame(
3252+
... [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
3253+
... ("time", "id", "v1"))
3254+
>>> df2 = spark.createDataFrame(
3255+
... [(20000101, 1, "x"), (20000101, 2, "y")],
3256+
... ("time", "id", "v2"))
3257+
>>> @pandas_udf("time int, id int, v1 double, v2 string",
3258+
... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
3259+
... def asof_join(l, r):
3260+
... return pd.merge_asof(l, r, on="time", by="id")
3261+
>>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
3262+
+---------+---+---+---+
3263+
| time| id| v1| v2|
3264+
+---------+---+---+---+
3265+
| 20000101| 1|1.0| x|
3266+
| 20000102| 1|3.0| x|
3267+
| 20000101| 2|2.0| y|
3268+
| 20000102| 2|4.0| y|
3269+
+---------+---+---+---+
3270+
3271+
Alternatively, the user can define a function that takes three arguments. In this case,
3272+
the grouping key(s) will be passed as the first argument and the data will be passed as the
3273+
second and third arguments. The grouping key(s) will be passed as a tuple of numpy data
3274+
types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
3275+
`pandas.DataFrame` containing all columns from the original Spark DataFrames.
3276+
>>> @pandas_udf("time int, id int, v1 double, v2 string",
3277+
... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
3278+
... def asof_join(k, l, r):
3279+
... if k == (1,):
3280+
... return pd.merge_asof(l, r, on="time", by="id")
3281+
... else:
3282+
... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
3283+
>>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
3284+
+---------+---+---+---+
3285+
| time| id| v1| v2|
3286+
+---------+---+---+---+
3287+
| 20000101| 1|1.0| x|
3288+
| 20000102| 1|3.0| x|
3289+
+---------+---+---+---+
3290+
32393291
.. note:: The user-defined functions are considered deterministic by default. Due to
32403292
optimization, duplicate invocations may be eliminated or the function may even be invoked
32413293
more times than it is present in the query. If your function is not deterministic, call

0 commit comments

Comments
 (0)