diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 25426948c275..89d558ce8d40 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1661,7 +1661,7 @@ installed and available on all cluster node Python environments. The current sup ## How to Enable for Conversion to/from Pandas -Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call +Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call `toPandas()` and when creating a Spark DataFrame from Pandas with `createDataFrame(pandas_df)`. To use Arrow when executing these calls, it first must be enabled by setting the Spark conf 'spark.sql.execution.arrow.enabled' to 'true', this is disabled by default. @@ -1743,7 +1743,104 @@ df.select(multiply(col("x"), col("x"))).show() -## GroupBy-Apply UDFs +## GroupBy-Apply +GroupBy-Apply implements the "split-apply-combine" pattern. Split-apply-combine consists of three steps: + +* Split the data into groups by using `DataFrame.groupBy`. +* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The + input data contains all the rows and columns for each group. +* Combine the results into a new `DataFrame`. + +To use GroupBy-Apply, user needs to define: +* A python function that defines the computation for each group +* A `StructType` object or a string that defines the output schema of the output `DataFrame` + +Examples: + +The first example shows a simple use case: subtracting mean from each value in the group. + +
+
+{% highlight python %} + +from pyspark.sql.functions import pandas_udf, PandasUDFType + +df = spark.createDataFrame( + [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], + ("id", "v")) + +@pandas_udf("id long, v double", PandasUDFType.GROUP_MAP) +def substract_mean(pdf): + v = pdf.v + return pdf.assign(v=v - v.mean()) + +df.groupby("id").apply(substract_mean).show() ++---+----+ +| id| v| ++---+----+ +| 1|-0.5| +| 1| 0.5| +| 2|-3.0| +| 2|-1.0| +| 2| 4.0| ++---+----+ + +{% endhighlight %} +
+
+ +The second example is a more complicated example. It shows how to run a OLS linear regression +for each group using statsmodels. For each group, we calculate beta b = (b1, b2) for X = (x1, x2) +according to statistical model Y = bX + c. + +
+
+{% highlight python %} +import pandas as pd +import statsmodels.api as sm + +df = spark.createDataFrame( + [(1, 1.0, 1.0, -3.0), + (1, 2.0, 2.0, -6.0), + (2, 3.0, 6.0, -6.0), + (2, 5.0, 10.0, -10.0), + (2, 10.0, 20.0, -20.0)], + ("id", "y", 'x1', 'x2')) + +group_column = 'id' +y_column = 'y' +x_columns = ['x1', 'x2'] +schema = df.select(group_column, *x_columns).schema + +@pandas_udf(schema, PandasUDFType.GROUP_MAP) +# Input/output are both a pandas.DataFrame +def ols(pdf): + group_key = pdf[group_column].iloc[0] + Y = pdf[y_column] + X = pdf[x_columns] + X = sm.add_constant(X) + model = sm.OLS(Y, X).fit() + + print(model.params) + print(model.summary()) + return pd.DataFrame([[group_key] + [model.params[i] for i in x_columns]]) + +beta = df.groupby(group_column).apply(ols) + +beta.show() ++---+-------------------+--------------------+ +| id| x1| x2| ++---+-------------------+--------------------+ +| 1|0.10000000000000003| -0.3000000000000001| +| 2|0.24999999999999997|-0.24999999999999997| ++---+-------------------+--------------------+ + +{% endhighlight %} +
+
+ +For detailed usage, please see `pyspark.sql.functions.pandas_udf` and +`pyspark.sql.GroupedData.apply`. ## Usage Notes @@ -1786,7 +1883,7 @@ values will be truncated. Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is different than a Pandas timestamp. It is recommended to use Pandas time series functionality when working with timestamps in `pandas_udf`s to get the best performance, see -[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details. +[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details. # Migration Guide @@ -1936,7 +2033,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see Note that, for DecimalType(38,0)*, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type. - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc. - In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details. - - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame. + - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame. - Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489). - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`. - Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`.