Skip to content

Commit 4872b63

Browse files
committed
Add user guide for groupby apply
1 parent b33346c commit 4872b63

File tree

1 file changed

+101
-4
lines changed

1 file changed

+101
-4
lines changed

docs/sql-programming-guide.md

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,7 +1661,7 @@ installed and available on all cluster node Python environments. The current sup
16611661

16621662
## How to Enable for Conversion to/from Pandas
16631663

1664-
Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call
1664+
Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call
16651665
`toPandas()` and when creating a Spark DataFrame from Pandas with `createDataFrame(pandas_df)`.
16661666
To use Arrow when executing these calls, it first must be enabled by setting the Spark conf
16671667
'spark.sql.execution.arrow.enabled' to 'true', this is disabled by default.
@@ -1743,7 +1743,104 @@ df.select(multiply(col("x"), col("x"))).show()
17431743
</div>
17441744
</div>
17451745

1746-
## GroupBy-Apply UDFs
1746+
## GroupBy-Apply
1747+
GroupBy-Apply implements the "split-apply-combine" pattern. Split-apply-combine consists of three steps:
1748+
1749+
* Split the data into groups by using `DataFrame.groupBy`.
1750+
* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The
1751+
input data contains all the rows and columns for each group.
1752+
* Combine the results into a new `DataFrame`.
1753+
1754+
To use GroupBy-Apply, user needs to define:
1755+
* A python function that defines the computation for each group
1756+
* A `StructType` object or a string that defines the output schema of the output `DataFrame`
1757+
1758+
Examples:
1759+
1760+
The first example shows a simple use case: subtracting mean from each value in the group.
1761+
1762+
<div class="codetabs">
1763+
<div data-lang="python" markdown="1">
1764+
{% highlight python %}
1765+
1766+
from pyspark.sql.functions import pandas_udf, PandasUDFType
1767+
1768+
df = spark.createDataFrame(
1769+
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
1770+
("id", "v"))
1771+
1772+
@pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
1773+
def substract_mean(pdf):
1774+
v = pdf.v
1775+
return pdf.assign(v=v - v.mean())
1776+
1777+
df.groupby("id").apply(substract_mean).show()
1778+
+---+----+
1779+
| id| v|
1780+
+---+----+
1781+
| 1|-0.5|
1782+
| 1| 0.5|
1783+
| 2|-3.0|
1784+
| 2|-1.0|
1785+
| 2| 4.0|
1786+
+---+----+
1787+
1788+
{% endhighlight %}
1789+
</div>
1790+
</div>
1791+
1792+
The second example is a more complicated example. It shows how to run a OLS linear regression
1793+
for each group using statsmodels. For each group, we calculate beta b = (b1, b2) for X = (x1, x2)
1794+
according to statistical model Y = bX + c.
1795+
1796+
<div class="codetabs">
1797+
<div data-lang="python" markdown="1">
1798+
{% highlight python %}
1799+
import pandas as pd
1800+
import statsmodels.api as sm
1801+
1802+
df = spark.createDataFrame(
1803+
[(1, 1.0, 1.0, -3.0),
1804+
(1, 2.0, 2.0, -6.0),
1805+
(2, 3.0, 6.0, -6.0),
1806+
(2, 5.0, 10.0, -10.0),
1807+
(2, 10.0, 20.0, -20.0)],
1808+
("id", "y", 'x1', 'x2'))
1809+
1810+
group_column = 'id'
1811+
y_column = 'y'
1812+
x_columns = ['x1', 'x2']
1813+
schema = df.select(group_column, *x_columns).schema
1814+
1815+
@pandas_udf(schema, PandasUDFType.GROUP_MAP)
1816+
# Input/output are both a pandas.DataFrame
1817+
def ols(pdf):
1818+
group_key = pdf[group_column].iloc[0]
1819+
Y = pdf[y_column]
1820+
X = pdf[x_columns]
1821+
X = sm.add_constant(X)
1822+
model = sm.OLS(Y, X).fit()
1823+
1824+
print(model.params)
1825+
print(model.summary())
1826+
return pd.DataFrame([[group_key] + [model.params[i] for i in x_columns]])
1827+
1828+
beta = df.groupby(group_column).apply(ols)
1829+
1830+
beta.show()
1831+
+---+-------------------+--------------------+
1832+
| id| x1| x2|
1833+
+---+-------------------+--------------------+
1834+
| 1|0.10000000000000003| -0.3000000000000001|
1835+
| 2|0.24999999999999997|-0.24999999999999997|
1836+
+---+-------------------+--------------------+
1837+
1838+
{% endhighlight %}
1839+
</div>
1840+
</div>
1841+
1842+
For detailed usage, please see `pyspark.sql.functions.pandas_udf` and
1843+
`pyspark.sql.GroupedData.apply`.
17471844

17481845
## Usage Notes
17491846

@@ -1786,7 +1883,7 @@ values will be truncated.
17861883
Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is
17871884
different than a Pandas timestamp. It is recommended to use Pandas time series functionality when
17881885
working with timestamps in `pandas_udf`s to get the best performance, see
1789-
[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details.
1886+
[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details.
17901887

17911888
# Migration Guide
17921889

@@ -1936,7 +2033,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
19362033
Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
19372034
- In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc.
19382035
- In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details.
1939-
- In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame.
2036+
- In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame.
19402037
- Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489).
19412038
- Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`.
19422039
- Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`.

0 commit comments

Comments
 (0)