Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 101 additions & 4 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1661,7 +1661,7 @@ installed and available on all cluster node Python environments. The current sup

## How to Enable for Conversion to/from Pandas

Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call
Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call
`toPandas()` and when creating a Spark DataFrame from Pandas with `createDataFrame(pandas_df)`.
To use Arrow when executing these calls, it first must be enabled by setting the Spark conf
'spark.sql.execution.arrow.enabled' to 'true', this is disabled by default.
Expand Down Expand Up @@ -1743,7 +1743,104 @@ df.select(multiply(col("x"), col("x"))).show()
</div>
</div>

## GroupBy-Apply UDFs
## GroupBy-Apply
GroupBy-Apply implements the "split-apply-combine" pattern. Split-apply-combine consists of three steps:

* Split the data into groups by using `DataFrame.groupBy`.
* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The
input data contains all the rows and columns for each group.
* Combine the results into a new `DataFrame`.

To use GroupBy-Apply, user needs to define:
* A python function that defines the computation for each group
* A `StructType` object or a string that defines the output schema of the output `DataFrame`

Examples:

The first example shows a simple use case: subtracting mean from each value in the group.

<div class="codetabs">
<div data-lang="python" markdown="1">
{% highlight python %}

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
def substract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())

df.groupby("id").apply(substract_mean).show()
+---+----+
| id| v|
+---+----+
| 1|-0.5|
| 1| 0.5|
| 2|-3.0|
| 2|-1.0|
| 2| 4.0|
+---+----+

{% endhighlight %}
</div>
</div>

The second example is a more complicated example. It shows how to run a OLS linear regression
for each group using statsmodels. For each group, we calculate beta b = (b1, b2) for X = (x1, x2)
according to statistical model Y = bX + c.

<div class="codetabs">
<div data-lang="python" markdown="1">
{% highlight python %}
import pandas as pd
import statsmodels.api as sm

df = spark.createDataFrame(
[(1, 1.0, 1.0, -3.0),
(1, 2.0, 2.0, -6.0),
(2, 3.0, 6.0, -6.0),
(2, 5.0, 10.0, -10.0),
(2, 10.0, 20.0, -20.0)],
("id", "y", 'x1', 'x2'))

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUP_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
group_key = pdf[group_column].iloc[0]
Y = pdf[y_column]
X = pdf[x_columns]
X = sm.add_constant(X)
model = sm.OLS(Y, X).fit()

print(model.params)
print(model.summary())
return pd.DataFrame([[group_key] + [model.params[i] for i in x_columns]])

beta = df.groupby(group_column).apply(ols)

beta.show()
+---+-------------------+--------------------+
| id| x1| x2|
+---+-------------------+--------------------+
| 1|0.10000000000000003| -0.3000000000000001|
| 2|0.24999999999999997|-0.24999999999999997|
+---+-------------------+--------------------+

{% endhighlight %}
</div>
</div>

For detailed usage, please see `pyspark.sql.functions.pandas_udf` and
`pyspark.sql.GroupedData.apply`.

## Usage Notes

Expand Down Expand Up @@ -1786,7 +1883,7 @@ values will be truncated.
Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is
different than a Pandas timestamp. It is recommended to use Pandas time series functionality when
working with timestamps in `pandas_udf`s to get the best performance, see
[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details.
[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details.

# Migration Guide

Expand Down Expand Up @@ -1936,7 +2033,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
- In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc.
- In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details.
- In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame.
- In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame.
- Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489).
- Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`.
- Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`.
Expand Down