-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-22221][DOCS] Adding User Documentation for Arrow #19575
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a32cd3f
5699d1b
47bfc21
b33346c
4872b63
85e895c
dd51af1
399b39c
15a116f
ee1b2c0
a32d67a
3f31930
eb1b347
c446716
67ab5e9
8b629bc
e46ff0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1640,6 +1640,138 @@ Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` a | |
| You may run `./bin/spark-sql --help` for a complete list of all available | ||
| options. | ||
|
|
||
| # PySpark Usage Guide for Pandas with Apache Arrow | ||
|
|
||
| ## Apache Arrow in Spark | ||
|
|
||
| Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer | ||
| data between JVM and Python processes. This currently is most beneficial to Python users that | ||
| work with Pandas/NumPy data. Its usage is not automatic and might require some minor | ||
| changes to configuration or code to take full advantage and ensure compatibility. This guide will | ||
| give a high-level description of how to use Arrow in Spark and highlight any differences when | ||
| working with Arrow-enabled data. | ||
|
|
||
| ### Ensure PyArrow Installed | ||
|
|
||
| If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the | ||
| SQL module with the command `pip install pyspark[sql]`. Otherwise, you must ensure that PyArrow | ||
| is installed and available on all cluster nodes. The current supported version is 0.8.0. | ||
| You can install using pip or conda from the conda-forge channel. See PyArrow | ||
| [installation](https://arrow.apache.org/docs/python/install.html) for details. | ||
|
|
||
| ## Enabling for Conversion to/from Pandas | ||
|
|
||
| Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame | ||
| using the call `toPandas()` and when creating a Spark DataFrame from a Pandas DataFrame with | ||
| `createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set | ||
| the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default. | ||
|
|
||
| <div class="codetabs"> | ||
| <div data-lang="python" markdown="1"> | ||
| {% include_example dataframe_with_arrow python/sql/arrow.py %} | ||
| </div> | ||
| </div> | ||
|
|
||
| Using the above optimizations with Arrow will produce the same results as when Arrow is not | ||
| enabled. Note that even with Arrow, `toPandas()` results in the collection of all records in the | ||
| DataFrame to the driver program and should be done on a small subset of the data. Not all Spark | ||
| data types are currently supported and an error can be raised if a column has an unsupported type, | ||
| see [Supported Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`, | ||
| Spark will fall back to create the DataFrame without Arrow. | ||
|
|
||
| ## Pandas UDFs (a.k.a. Vectorized UDFs) | ||
|
|
||
| Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and | ||
| Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator | ||
| or to wrap the function, no additional configuration is required. Currently, there are two types of | ||
| Pandas UDF: Scalar and Group Map. | ||
|
|
||
| ### Scalar | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the side note, I think in the context here. |
||
|
|
||
| Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such | ||
| as `select` and `withColumn`. The Python function should take `pandas.Series` as inputs and return | ||
| a `pandas.Series` of the same length. Internally, Spark will execute a Pandas UDF by splitting | ||
| columns into batches and calling the function for each batch as a subset of the data, then | ||
| concatenating the results together. | ||
|
|
||
| The following example shows how to create a scalar Pandas UDF that computes the product of 2 columns. | ||
|
|
||
| <div class="codetabs"> | ||
| <div data-lang="python" markdown="1"> | ||
| {% include_example scalar_pandas_udf python/sql/arrow.py %} | ||
| </div> | ||
| </div> | ||
|
|
||
| ### Group Map | ||
| Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rxin WDYT about this name?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which name? If you mean "split-apply-combine", I think it's fine - https://pandas.pydata.org/pandas-docs/stable/groupby.html
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can change to whatever you guys like, but I think these two section names were made to reflect the different pandas_udf types - scalar and group map. Is that right @icexelloss ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is correct. The names in this section matches the enums in
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @icexelloss we already agreed on the names when we wrote the blog, right? |
||
| Split-apply-combine consists of three steps: | ||
| * Split the data into groups by using `DataFrame.groupBy`. | ||
| * Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The | ||
| input data contains all the rows and columns for each group. | ||
| * Combine the results into a new `DataFrame`. | ||
|
|
||
| To use `groupBy().apply()`, the user needs to define the following: | ||
| * A Python function that defines the computation for each group. | ||
| * A `StructType` object or a string that defines the schema of the output `DataFrame`. | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to warn users that, the Group Map Pandas UDF requires to load all the data of a group into memory, which is not controlled by
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah good point, I'll add that |
||
| Note that all data for a group will be loaded into memory before the function is applied. This can | ||
| lead to out of memory exceptons, especially if the group sizes are skewed. The configuration for | ||
| [maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to the user | ||
| to ensure that the grouped data will fit into the available memory. | ||
|
|
||
| The following example shows how to use `groupby().apply()` to subtract the mean from each value in the group. | ||
|
|
||
| <div class="codetabs"> | ||
| <div data-lang="python" markdown="1"> | ||
| {% include_example group_map_pandas_udf python/sql/arrow.py %} | ||
| </div> | ||
| </div> | ||
|
|
||
| For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and | ||
| [`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply). | ||
|
|
||
| ## Usage Notes | ||
|
|
||
| ### Supported SQL Types | ||
|
|
||
| Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`, | ||
| `ArrayType` of `TimestampType`, and nested `StructType`. | ||
|
|
||
| ### Setting Arrow Batch Size | ||
|
|
||
| Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to | ||
| high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow | ||
| record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on this description, it sounds like we should not use the number of records, but the size, right? cc @cloud-fan @ueshin too
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, both can be used where applicable.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We went with
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's possible to estimate the size of the arrow buffer used, but it does make it more complicated to implement in Spark. I also wonder how useful this would be if the user hits memory problems. At least with a number of records, it's easy to understand and change.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current approach is just to make the external users hard to tune. Now,
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's not an ideal approach. I'm happy to make a JIRA to followup and look into other ways to break up the batches, but that won't be in before 2.3. So does that mean our options here are (unless I'm not understanding internal/external conf correctly)
I think (2) is better because if the user hits memory issues, then they can at least find someway to adjust it
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since it is too late to add a new conf for 2.3 release, we can do it in 2.4 release. In the 2.4 release, we can respect both conf. We just need to change the default of |
||
| to an integer that will determine the maximum number of rows for each batch. The default value is | ||
| 10,000 records per batch. If the number of columns is large, the value should be adjusted | ||
| accordingly. Using this limit, each data partition will be made into 1 or more record batches for | ||
| processing. | ||
|
|
||
| ### Timestamp with Time Zone Semantics | ||
|
|
||
| Spark internally stores timestamps as UTC values, and timestamp data that is brought in without | ||
| a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp | ||
| data is exported or displayed in Spark, the session time zone is used to localize the timestamp | ||
| values. The session time zone is set with the configuration 'spark.sql.session.timeZone' and will | ||
| default to the JVM system local time zone if not set. Pandas uses a `datetime64` type with nanosecond | ||
| resolution, `datetime64[ns]`, with optional time zone on a per-column basis. | ||
|
|
||
| When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds | ||
| and each column will be converted to the Spark session time zone then localized to that time | ||
| zone, which removes the time zone and displays values as local time. This will occur | ||
| when calling `toPandas()` or `pandas_udf` with timestamp columns. | ||
|
|
||
| When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This | ||
| occurs when calling `createDataFrame` with a Pandas DataFrame or when returning a timestamp from a | ||
| `pandas_udf`. These conversions are done automatically to ensure Spark will have data in the | ||
| expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond | ||
| values will be truncated. | ||
|
|
||
| Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is | ||
| different than a Pandas timestamp. It is recommended to use Pandas time series functionality when | ||
| working with timestamps in `pandas_udf`s to get the best performance, see | ||
| [here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details. | ||
|
|
||
| # Migration Guide | ||
|
|
||
| ## Upgrading From Spark SQL 2.2 to 2.3 | ||
|
|
@@ -1788,7 +1920,7 @@ options. | |
| Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type. | ||
| - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc. | ||
| - In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details. | ||
| - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame. | ||
| - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame. | ||
| - Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489). | ||
| - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`. | ||
| - Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| """ | ||
| A simple example demonstrating Arrow in Spark. | ||
| Run with: | ||
| ./bin/spark-submit examples/src/main/python/sql/arrow.py | ||
| """ | ||
|
|
||
| from __future__ import print_function | ||
|
|
||
| from pyspark.sql import SparkSession | ||
| from pyspark.sql.utils import require_minimum_pandas_version, require_minimum_pyarrow_version | ||
|
|
||
| require_minimum_pandas_version() | ||
| require_minimum_pyarrow_version() | ||
|
|
||
|
|
||
| def dataframe_with_arrow_example(spark): | ||
| # $example on:dataframe_with_arrow$ | ||
| import numpy as np | ||
| import pandas as pd | ||
|
|
||
| # Enable Arrow-based columnar data transfers | ||
| spark.conf.set("spark.sql.execution.arrow.enabled", "true") | ||
|
|
||
| # Generate a Pandas DataFrame | ||
| pdf = pd.DataFrame(np.random.rand(100, 3)) | ||
|
|
||
| # Create a Spark DataFrame from a Pandas DataFrame using Arrow | ||
| df = spark.createDataFrame(pdf) | ||
|
|
||
| # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow | ||
| result_pdf = df.select("*").toPandas() | ||
| # $example off:dataframe_with_arrow$ | ||
| print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe())) | ||
|
|
||
|
|
||
| def scalar_pandas_udf_example(spark): | ||
| # $example on:scalar_pandas_udf$ | ||
| import pandas as pd | ||
|
|
||
| from pyspark.sql.functions import col, pandas_udf | ||
| from pyspark.sql.types import LongType | ||
|
|
||
| # Declare the function and create the UDF | ||
| def multiply_func(a, b): | ||
| return a * b | ||
|
|
||
| multiply = pandas_udf(multiply_func, returnType=LongType()) | ||
|
|
||
| # The function for a pandas_udf should be able to execute with local Pandas data | ||
| x = pd.Series([1, 2, 3]) | ||
| print(multiply_func(x, x)) | ||
| # 0 1 | ||
| # 1 4 | ||
| # 2 9 | ||
| # dtype: int64 | ||
|
|
||
| # Create a Spark DataFrame, 'spark' is an existing SparkSession | ||
| df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) | ||
|
|
||
| # Execute function as a Spark vectorized UDF | ||
| df.select(multiply(col("x"), col("x"))).show() | ||
| # +-------------------+ | ||
| # |multiply_func(x, x)| | ||
| # +-------------------+ | ||
| # | 1| | ||
| # | 4| | ||
| # | 9| | ||
| # +-------------------+ | ||
| # $example off:scalar_pandas_udf$ | ||
|
|
||
|
|
||
| def group_map_pandas_udf_example(spark): | ||
| # $example on:group_map_pandas_udf$ | ||
| from pyspark.sql.functions import pandas_udf, PandasUDFType | ||
|
|
||
| df = spark.createDataFrame( | ||
| [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], | ||
| ("id", "v")) | ||
|
|
||
| @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP) | ||
| def substract_mean(pdf): | ||
| # pdf is a pandas.DataFrame | ||
| v = pdf.v | ||
| return pdf.assign(v=v - v.mean()) | ||
|
|
||
| df.groupby("id").apply(substract_mean).show() | ||
| # +---+----+ | ||
| # | id| v| | ||
| # +---+----+ | ||
| # | 1|-0.5| | ||
| # | 1| 0.5| | ||
| # | 2|-3.0| | ||
| # | 2|-1.0| | ||
| # | 2| 4.0| | ||
| # +---+----+ | ||
| # $example off:group_map_pandas_udf$ | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| spark = SparkSession \ | ||
| .builder \ | ||
| .appName("Python Arrow-in-Spark example") \ | ||
| .getOrCreate() | ||
|
|
||
| print("Running Pandas to/from conversion example") | ||
| dataframe_with_arrow_example(spark) | ||
| print("Running pandas_udf scalar example") | ||
| scalar_pandas_udf_example(spark) | ||
| print("Running pandas_udf group map example") | ||
| group_map_pandas_udf_example(spark) | ||
|
|
||
| spark.stop() |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took another spin at this section and below to hopefully make it a bit clearer and indicate that
pandas_udfdoesn't need any configuration to be set. I also capitalized Pandas UDF to make it consistent and we kind of use it like a proper noun.