Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/sql-pyspark-pandas-with-arrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,41 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p
[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).


### Cogrouped Map

Cogrouped map Pandas UDFs allow two DataFrames to be cogrouped by a common key and then a python function applied to
each cogroup. They are used with `groupBy().cogroup().apply()` which consists of the following steps:

* Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
* Apply a function to each cogroup. The input of the function is two `pandas.DataFrame` (with an optional Tuple
representing the key). The output of the function is a `pandas.DataFrame`.
* Combine the pandas.DataFrames from all groups into a new `DataFrame`.

To use `groupBy().cogroup().apply()`, the user needs to define the following:
* A Python function that defines the computation for each cogroup.
* A `StructType` object or a string that defines the schema of the output `DataFrame`.

The column labels of the returned `pandas.DataFrame` must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
on how to label columns when constructing a `pandas.DataFrame`.

Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of
memory exceptions, especially if the group sizes are skewed. The configuration for [maxRecordsPerBatch](#setting-arrow-batch-size)
is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.

The following example shows how to use `groupby().cogroup().apply()` to perform an asof join between two datasets.

<div class="codetabs">
<div data-lang="python" markdown="1">
{% include_example cogrouped_map_pandas_udf python/sql/arrow.py %}
</div>
</div>

For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and
[`pyspark.sql.CoGroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.CoGroupedData.apply).


## Usage Notes

### Supported SQL Types
Expand Down
32 changes: 32 additions & 0 deletions examples/src/main/python/sql/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,36 @@ def filter_func(batch_iter):
# $example off:map_iter_pandas_udf$


def cogrouped_map_pandas_udf_example(spark):
# $example on:cogrouped_map_pandas_udf$
import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))

df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))

@pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
# $example off:cogrouped_map_pandas_udf$


if __name__ == "__main__":
spark = SparkSession \
.builder \
Expand All @@ -276,5 +306,7 @@ def filter_func(batch_iter):
grouped_agg_pandas_udf_example(spark)
print("Running pandas_udf map iterator example")
map_iter_pandas_udf_example(spark)
print("Running pandas_udf cogrouped map example")
cogrouped_map_pandas_udf_example(spark)

spark.stop()
52 changes: 52 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3233,6 +3233,58 @@ def pandas_udf(f=None, returnType=None, functionType=None):
| 1| 21|
+---+---+

6. COGROUPED_MAP

A cogrouped map UDF defines transformation: (`pandas.DataFrame`, `pandas.DataFrame`) ->
`pandas.DataFrame`. The `returnType` should be a :class:`StructType` describing the schema
of the returned `pandas.DataFrame`. The column labels of the returned `pandas.DataFrame`
must either match the field names in the defined `returnType` schema if specified as strings,
or match the field data types by position if not strings, e.g. integer indices. The length
of the returned `pandas.DataFrame` can be arbitrary.

CoGrouped map UDFs are used with :meth:`pyspark.sql.CoGroupedData.apply`.

>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df1 = spark.createDataFrame(
... [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
... ("time", "id", "v1"))
>>> df2 = spark.createDataFrame(
... [(20000101, 1, "x"), (20000101, 2, "y")],
... ("time", "id", "v2"))
>>> @pandas_udf("time int, id int, v1 double, v2 string",
... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
... def asof_join(l, r):
... return pd.merge_asof(l, r, on="time", by="id")
>>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
+---------+---+---+---+
| time| id| v1| v2|
+---------+---+---+---+
| 20000101| 1|1.0| x|
| 20000102| 1|3.0| x|
| 20000101| 2|2.0| y|
| 20000102| 2|4.0| y|
+---------+---+---+---+

Alternatively, the user can define a function that takes three arguments. In this case,
the grouping key(s) will be passed as the first argument and the data will be passed as the
second and third arguments. The grouping key(s) will be passed as a tuple of numpy data
types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
`pandas.DataFrame` containing all columns from the original Spark DataFrames.
>>> @pandas_udf("time int, id int, v1 double, v2 string",
... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
... def asof_join(k, l, r):
... if k == (1,):
... return pd.merge_asof(l, r, on="time", by="id")
... else:
... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
>>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
+---------+---+---+---+
| time| id| v1| v2|
+---------+---+---+---+
| 20000101| 1|1.0| x|
| 20000102| 1|3.0| x|
+---------+---+---+---+

.. note:: The user-defined functions are considered deterministic by default. Due to
optimization, duplicate invocations may be eliminated or the function may even be invoked
more times than it is present in the query. If your function is not deterministic, call
Expand Down