diff --git a/docs/sql-pyspark-pandas-with-arrow.md b/docs/sql-pyspark-pandas-with-arrow.md
index 7f01483d40583..d638278b42355 100644
--- a/docs/sql-pyspark-pandas-with-arrow.md
+++ b/docs/sql-pyspark-pandas-with-arrow.md
@@ -178,6 +178,41 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p
[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
+### Cogrouped Map
+
+Cogrouped map Pandas UDFs allow two DataFrames to be cogrouped by a common key and then a python function applied to
+each cogroup. They are used with `groupBy().cogroup().apply()` which consists of the following steps:
+
+* Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
+* Apply a function to each cogroup. The input of the function is two `pandas.DataFrame` (with an optional Tuple
+representing the key). The output of the function is a `pandas.DataFrame`.
+* Combine the pandas.DataFrames from all groups into a new `DataFrame`.
+
+To use `groupBy().cogroup().apply()`, the user needs to define the following:
+* A Python function that defines the computation for each cogroup.
+* A `StructType` object or a string that defines the schema of the output `DataFrame`.
+
+The column labels of the returned `pandas.DataFrame` must either match the field names in the
+defined output schema if specified as strings, or match the field data types by position if not
+strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
+on how to label columns when constructing a `pandas.DataFrame`.
+
+Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of
+memory exceptions, especially if the group sizes are skewed. The configuration for [maxRecordsPerBatch](#setting-arrow-batch-size)
+is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.
+
+The following example shows how to use `groupby().cogroup().apply()` to perform an asof join between two datasets.
+
+
+
+{% include_example cogrouped_map_pandas_udf python/sql/arrow.py %}
+
+
+
+For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and
+[`pyspark.sql.CoGroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.CoGroupedData.apply).
+
+
## Usage Notes
### Supported SQL Types
diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py
index de8d4f755de6f..d5a3173ff9c0a 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -258,6 +258,36 @@ def filter_func(batch_iter):
# $example off:map_iter_pandas_udf$
+def cogrouped_map_pandas_udf_example(spark):
+ # $example on:cogrouped_map_pandas_udf$
+ import pandas as pd
+
+ from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+ df1 = spark.createDataFrame(
+ [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
+ ("time", "id", "v1"))
+
+ df2 = spark.createDataFrame(
+ [(20000101, 1, "x"), (20000101, 2, "y")],
+ ("time", "id", "v2"))
+
+ @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
+ def asof_join(l, r):
+ return pd.merge_asof(l, r, on="time", by="id")
+
+ df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()
+ # +--------+---+---+---+
+ # | time| id| v1| v2|
+ # +--------+---+---+---+
+ # |20000101| 1|1.0| x|
+ # |20000102| 1|3.0| x|
+ # |20000101| 2|2.0| y|
+ # |20000102| 2|4.0| y|
+ # +--------+---+---+---+
+ # $example off:cogrouped_map_pandas_udf$
+
+
if __name__ == "__main__":
spark = SparkSession \
.builder \
@@ -276,5 +306,7 @@ def filter_func(batch_iter):
grouped_agg_pandas_udf_example(spark)
print("Running pandas_udf map iterator example")
map_iter_pandas_udf_example(spark)
+ print("Running pandas_udf cogrouped map example")
+ cogrouped_map_pandas_udf_example(spark)
spark.stop()
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 95b78175d5561..0ad3a3ad4e6a0 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -3233,6 +3233,58 @@ def pandas_udf(f=None, returnType=None, functionType=None):
| 1| 21|
+---+---+
+ 6. COGROUPED_MAP
+
+ A cogrouped map UDF defines transformation: (`pandas.DataFrame`, `pandas.DataFrame`) ->
+ `pandas.DataFrame`. The `returnType` should be a :class:`StructType` describing the schema
+ of the returned `pandas.DataFrame`. The column labels of the returned `pandas.DataFrame`
+ must either match the field names in the defined `returnType` schema if specified as strings,
+ or match the field data types by position if not strings, e.g. integer indices. The length
+ of the returned `pandas.DataFrame` can be arbitrary.
+
+ CoGrouped map UDFs are used with :meth:`pyspark.sql.CoGroupedData.apply`.
+
+ >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+ >>> df1 = spark.createDataFrame(
+ ... [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
+ ... ("time", "id", "v1"))
+ >>> df2 = spark.createDataFrame(
+ ... [(20000101, 1, "x"), (20000101, 2, "y")],
+ ... ("time", "id", "v2"))
+ >>> @pandas_udf("time int, id int, v1 double, v2 string",
+ ... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
+ ... def asof_join(l, r):
+ ... return pd.merge_asof(l, r, on="time", by="id")
+ >>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
+ +---------+---+---+---+
+ | time| id| v1| v2|
+ +---------+---+---+---+
+ | 20000101| 1|1.0| x|
+ | 20000102| 1|3.0| x|
+ | 20000101| 2|2.0| y|
+ | 20000102| 2|4.0| y|
+ +---------+---+---+---+
+
+ Alternatively, the user can define a function that takes three arguments. In this case,
+ the grouping key(s) will be passed as the first argument and the data will be passed as the
+ second and third arguments. The grouping key(s) will be passed as a tuple of numpy data
+ types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
+ `pandas.DataFrame` containing all columns from the original Spark DataFrames.
+ >>> @pandas_udf("time int, id int, v1 double, v2 string",
+ ... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
+ ... def asof_join(k, l, r):
+ ... if k == (1,):
+ ... return pd.merge_asof(l, r, on="time", by="id")
+ ... else:
+ ... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
+ >>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
+ +---------+---+---+---+
+ | time| id| v1| v2|
+ +---------+---+---+---+
+ | 20000101| 1|1.0| x|
+ | 20000102| 1|3.0| x|
+ +---------+---+---+---+
+
.. note:: The user-defined functions are considered deterministic by default. Due to
optimization, duplicate invocations may be eliminated or the function may even be invoked
more times than it is present in the query. If your function is not deterministic, call