Skip to content

Commit 47bfc21

Browse files
committed
added section on pandas_udf, setting batch size
1 parent 5699d1b commit 47bfc21

File tree

1 file changed

+61
-4
lines changed

1 file changed

+61
-4
lines changed

docs/sql-programming-guide.md

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,9 +1667,9 @@ To use Arrow when executing these calls, it first must be enabled by setting the
16671667
'spark.sql.execution.arrow.enabled' to 'true', this is disabled by default.
16681668

16691669
<div class="codetabs">
1670-
16711670
<div data-lang="python" markdown="1">
16721671
{% highlight python %}
1672+
16731673
import numpy as np
16741674
import pandas as pd
16751675

@@ -1683,17 +1683,66 @@ pdf = pd.DataFrame(np.random.rand(100, 3))
16831683
df = spark.createDataFrame(pdf)
16841684

16851685
# Convert the Spark DataFrame to a local Pandas DataFrame
1686-
selpdf = df.select("*").toPandas()
1686+
selpdf = df.select(" * ").toPandas()
1687+
16871688
{% endhighlight %}
16881689
</div>
1689-
16901690
</div>
16911691

16921692
Using the above optimizations with Arrow will produce the same results as when Arrow is not
1693-
enabled.
1693+
enabled. Not all Spark data types are currently supported and an error will be raised if a column
1694+
has an unsupported type, see [Supported Types](#supported-types).
16941695

16951696
## How to Write Vectorized UDFs
16961697

1698+
A vectorized UDF is similar to a standard UDF in Spark except the inputs and output of the will
1699+
be Pandas Series, which allow the function to be composed with vectorized operations. This function
1700+
can then be run very efficiently in Spark where data is sent in batches to Python and the function
1701+
is executed using Pandas Series as input. The exected output of the function is also a Pandas
1702+
Series of the same length as the inputs. A vectorized UDF is declared using the `pandas_udf`
1703+
keyword, no additional configuration is required.
1704+
1705+
The following example shows how to create a vectorized UDF that computes the product of 2 columns.
1706+
1707+
<div class="codetabs">
1708+
<div data-lang="python" markdown="1">
1709+
{% highlight python %}
1710+
1711+
import pandas as pd
1712+
from pyspark.sql.functions import col, pandas_udf
1713+
from pyspark.sql.types import LongType
1714+
1715+
# Declare the function and create the UDF
1716+
def multiply_func(a, b):
1717+
return a * b
1718+
1719+
multiply = pandas_udf(multiply_func, returnType=LongType())
1720+
1721+
# The function for a pandas_udf should be able to execute with local Pandas data
1722+
x = pd.Series([1, 2, 3])
1723+
print(multiply_func(x, x))
1724+
# 0 1
1725+
# 1 4
1726+
# 2 9
1727+
# dtype: int64
1728+
1729+
# Create a Spark DataFrame
1730+
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
1731+
1732+
# Execute function as a Spark vectorized UDF
1733+
df.select(multiply(col("x"), col("x"))).show()
1734+
# +-------------------+
1735+
# |multiply_func(x, x)|
1736+
# +-------------------+
1737+
# | 1|
1738+
# | 4|
1739+
# | 9|
1740+
# +-------------------+
1741+
1742+
{% endhighlight %}
1743+
</div>
1744+
</div>
1745+
16971746
## GroupBy-Apply UDFs
16981747

16991748
## Usage Notes
@@ -1703,6 +1752,14 @@ enabled.
17031752
Currently, all Spark SQL data types are supported except `MapType`, `ArrayType` of `TimestampType`, and
17041753
nested `StructType`.
17051754

1755+
### Setting Arrow Batch Size
1756+
1757+
Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to
1758+
high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
1759+
record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch"
1760+
to an integer that will determine the maximum number of rows for each batch. Using this limit,
1761+
each data partition will be made into 1 or more record batches for processing.
1762+
17061763
### Date and Timestamp Semantics
17071764

17081765
Spark internally stores timestamps as UTC values and timestamp data that is brought in without

0 commit comments

Comments
 (0)