-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-40435][SS][PYTHON] Add test suites for applyInPandasWithState in PySpark #37894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40435][SS][PYTHON] Add test suites for applyInPandasWithState in PySpark #37894
Conversation
791c915 to
4caff5e
Compare
0ba441d to
a9bbe90
Compare
|
Just updated the PR description. Will rebase this PR once #37893 is merged. |
|
Hi @HeartSaVioR, you may have referenced the wrong PR in the description |
|
@xiaonanyang-db Thanks for noticing! Just fixed. |
bf65ea3 to
4a9763e
Compare
|
The change of this PR is only the last commit from the commit list. All others come from #37893. |
42e7c59 to
cbc8937
Compare
|
cc. @HyukjinKwon @alex-balikov |
### What changes were proposed in this pull request?
This PR proposes to introduce the new API `applyInPandasWithState` in PySpark, which provides the functionality to perform arbitrary stateful processing in Structured Streaming.
This will be a pair API with applyInPandas - applyInPandas in PySpark covers the use case of flatMapGroups in Scala/Java API, applyInPandasWithState in PySpark covers the use case of flatMapGroupsWithState in Scala/Java API.
The signature of API follows:
```
# call this function after groupBy
def applyInPandasWithState(
self,
func: "PandasGroupedMapFunctionWithState",
outputStructType: Union[StructType, str],
stateStructType: Union[StructType, str],
outputMode: str,
timeoutConf: str,
) -> DataFrame
```
and the signature of user function follows:
```
def func(
key: Tuple,
pdf_iter: Iterator[pandas.DataFrame],
state: GroupStateImpl
) -> Iterator[pandas.DataFrame]
```
(Please refer the code diff for function doc of new function.)
Major design choices which differ from existing APIs:
1. The new API is untyped, while flatMapGroupsWithState in typed API.
This is based on the nature of Python language - it's really duck typing and type definition is just a hint. We don't have the implementation of typed API for PySpark DataFrame.
This leads us to design the API to be untyped, meaning, all types for (input, state, output) should be Row-compatible. While we don't require end users to deal with `Row` directly, the model they will use for state and output must be convertible to Row with default encoder. If they want the python type for state which is not compatible with Row (e.g. custom class), they need to pickle and use BinaryType to store it.
This requires end users to specify the type of state and output via Spark SQL schema in the method.
Note that this helps to ensure compatibility for state data across Spark versions, as long as the encoders for 1) python type -> python Row and 2) python Row -> UnsafeRow are not changed. We won't change the underlying data layout for UnsafeRow, as it will break all of existing stateful query.
2. The new API will produce Pandas DataFrame to user function, while flatMapGroupsWithState produces iterator of rows.
We decided to follow the user experience applyInPandas provides for both consistency and performance (Arrow batching, vectorization, etc). This leads us to design the user function to leverage pandas DataFrame rather than iterator of rows. While this leads inconsistency of the UX from the Scala/Java API, we don't think this will come up as a problem since Pandas is considered as de-facto standard for Python data scientists.
3. The new API will produce iterator of Pandas DataFrame to user function and also require to return iterator of Pandas DataFrame to address scalability.
There is known limitation of applyInPandas, scalability. It basically requires data in a specific group to be fit into memory. During the design phase of new API, we decided to address the scalability rather than inheriting the limitation.
To address the scalability, we tweak the user function to receive an iterator (generator) of Pandas DataFrame instead of a single Pandas DataFrame, and also return an iterator (generator) of Pandas DataFrame. We think it does not hurt the UX too much, as for-each and yield would be enough to deal with the requirement of dealing with iterator.
Implementation perspective, we split the data in a specific group to multiple chunks, which each chunk is stored and sent as "an" Arrow RecordBatch, and then finally materialized to "a" pandas DataFrame. This way, as long as end users don't materialize lots of pandas DataFrames from the iterator at the same time, only one chunk will be materialized into memory which is scalable. Similar logic applies to the output of user function, hence scalable as well.
4. The new API also bin-packs the data with multiple groups into "an" Arrow RecordBatch.
Given the API is mainly used for streaming workload, it could be high likely that the volume of data in a specific group may not be huge enough to leverage the benefit of Arrow columnar batching, which would hurt the performance. To address this, we also do the opposite thing what we do for scalability, bin-pack. That said, an Arrow RecordBatch can contain data for multiple groups, as well as a part of data for specific group. This address both aspects of concerns together, scalability and performance.
Note that we are not implementing all of features Scala/Java API provide from the initial phase. e.g. Support for batch query and support for initial state will be left as TODO.
### Why are the changes needed?
PySpark users don't have a way to perform arbitrary stateful processing in Structured Streaming and being forced to use either Java or Scala which is unacceptable for users in many cases. This PR enables PySpark users to deal with it without moving to Java/Scala world.
### Does this PR introduce _any_ user-facing change?
Yes. We are exposing new public API in PySpark which performs arbitrary stateful processing.
### How was this patch tested?
N/A. We will make sure test suites are constructed via E2E manner under [SPARK-40431](https://issues.apache.org/jira/browse/SPARK-40431) - #37894
Closes #37893 from HeartSaVioR/SPARK-40434-on-top-of-SPARK-40433-SPARK-40432.
Lead-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
cbc8937 to
74d0ae8
Compare
|
Thanks! Merging to master. |
…`PythonUDFSuite` and `ContinuousSuite` ### What changes were proposed in this pull request? #37894 changed the preconditions for the following two tests from `assume(shouldTestGroupedAggPandasUDFs)` to `assume(shouldTestPythonUDFs)`: - `SPARK-39962: Global aggregation of Pandas UDF should respect the column order` in `PythonUDFSuite` - `continuous mode with various UDFs - Scalar Pandas UDF` in `ContinuousSuite` but this change this change will cause test failure if `pandas` is not installed, so this pr change the test preconditions from `assume(shouldTestPythonUDFs)` to `assume(shouldTestPandasUDFs)`. ### Why are the changes needed? Fix test precondition of `PythonUDFSuite` and `ContinuousSuite` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual test, `pandas` is not installed: ``` build/sbt clean "sql/testOnly org.apache.spark.sql.execution.python.PythonUDFSuite" build/sbt clean "sql/testOnly org.apache.spark.sql.streaming.continuous.ContinuousSuite" ``` **Before** PythonUDFSuite ``` [info] - SPARK-39962: Global aggregation of Pandas UDF should respect the column order *** FAILED *** (799 milliseconds) [info] java.lang.RuntimeException: Python executable [python3] and/or pyspark are unavailable. [info] at org.apache.spark.sql.IntegratedUDFTestUtils$.pandasGroupedAggFunc$lzycompute(IntegratedUDFTestUtils.scala:236) [info] at org.apache.spark.sql.IntegratedUDFTestUtils$.org$apache$spark$sql$IntegratedUDFTestUtils$$pandasGroupedAggFunc(IntegratedUDFTestUtils.scala:217) [info] at org.apache.spark.sql.IntegratedUDFTestUtils$TestGroupedAggPandasUDF.udf$lzycompute(IntegratedUDFTestUtils.scala:433) [info] at org.apache.spark.sql.IntegratedUDFTestUtils$TestGroupedAggPandasUDF.udf(IntegratedUDFTestUtils.scala:430) [info] at org.apache.spark.sql.IntegratedUDFTestUtils$TestGroupedAggPandasUDF.apply(IntegratedUDFTestUtils.scala:444) [info] at org.apache.spark.sql.execution.python.PythonUDFSuite.$anonfun$new$9(PythonUDFSuite.scala:82) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ``` and ContinuousSuite ``` [info] - continuous mode with various UDFs - Scalar Pandas UDF *** FAILED *** (715 milliseconds) [info] java.lang.RuntimeException: Python executable [python3] and/or pyspark are unavailable. [info] at org.apache.spark.sql.IntegratedUDFTestUtils$.pandasFunc$lzycompute(IntegratedUDFTestUtils.scala:214) [info] at org.apache.spark.sql.IntegratedUDFTestUtils$.org$apache$spark$sql$IntegratedUDFTestUtils$$pandasFunc(IntegratedUDFTestUtils.scala:194) [info] at org.apache.spark.sql.IntegratedUDFTestUtils$TestScalarPandasUDF$$anon$2.<init>(IntegratedUDFTestUtils.scala:382) [info] at org.apache.spark.sql.IntegratedUDFTestUtils$TestScalarPandasUDF.udf$lzycompute(IntegratedUDFTestUtils.scala:379) [info] at org.apache.spark.sql.IntegratedUDFTestUtils$TestScalarPandasUDF.udf(IntegratedUDFTestUtils.scala:379) [info] at org.apache.spark.sql.IntegratedUDFTestUtils$TestScalarPandasUDF.apply(IntegratedUDFTestUtils.scala:404) [info] at org.apache.spark.sql.streaming.continuous.ContinuousSuite.$anonfun$new$24(ContinuousSuite.scala:289) ``` **After** PythonUDFSuite ``` [info] Run completed in 11 seconds, 278 milliseconds. [info] Total number of tests run: 4 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 4, failed 0, canceled 1, ignored 0, pending 0 [info] All tests passed. [success] Total time: 72 s (01:12), completed 2022-9-28 15:46:40 ``` and ContinuousSuite ``` [info] Run completed in 33 seconds, 197 milliseconds. [info] Total number of tests run: 13 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 13, failed 0, canceled 1, ignored 0, pending 0 [info] All tests passed. [success] Total time: 64 s (01:04), completed 2022-9-28 15:49:45 ``` Closes #38028 from LuciferYang/SPARK-40435-FOLLOWUP. Lead-authored-by: yangjie01 <[email protected]> Co-authored-by: YangJie <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This PR adds the test suites for #37893, applyInPandasWithState. The new test suite mostly ports E2E test cases from existing flatMapGroupsWithState.
Why are the changes needed?
Tests are missing in #37893 by intention to reduce the size of change, and this PR fills the gap.
Does this PR introduce any user-facing change?
No, test only.
How was this patch tested?
New test suites.