Skip to content

Conversation

@pengzhon-db
Copy link
Contributor

@pengzhon-db pengzhon-db commented Apr 11, 2023

What changes were proposed in this pull request?

This change adds applyInPandasWithState support for Spark connect.
Example (try with local mode ./bin/pyspark --remote "local[*]"):

>>> from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
>>> from pyspark.sql.types import (
...     LongType,
...     StringType,
...     StructType,
...     StructField,
...     Row,
... )
>>> import pandas as pd
>>> output_type = StructType(
...     [StructField("key", StringType()), StructField("countAsString", StringType())]
... )
>>> state_type = StructType([StructField("c", LongType())])
>>> def func(key, pdf_iter, state):
...     total_len = 0
...     for pdf in pdf_iter:
...         total_len += len(pdf)
...     state.update((total_len,))
...     yield pd.DataFrame({"key": [key[0]], "countAsString": [str(total_len)]})
...
>>>
>>> input_path = "/Users/peng.zhong/tmp/applyInPandasWithState"
>>> df = spark.readStream.format("text").load(input_path)
>>> q = (
...       df.groupBy(df["value"])
...       .applyInPandasWithState(
...           func, output_type, state_type, "Update", GroupStateTimeout.NoTimeout
...       )
...       .writeStream.queryName("this_query")
...       .format("memory")
...       .outputMode("update")
...       .start()
...   )
>>>
>>> q.status
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
>>>
>>> spark.sql("select * from this_query").show()
+-----+-------------+
|  key|countAsString|
+-----+-------------+
|hello|            1|
| this|            1|
+-----+-------------+

Why are the changes needed?

This change adds an API support for spark connect.

Does this PR introduce any user-facing change?

This change adds an API support for spark connect.

How was this patch tested?

Manually tested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why it generates this. Will look into it

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes might be due to difference in python code generator.
Cc: @HyukjinKwon (are we planning to generated these at build time to avoid issues like this?).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly what I have been asking around .. but seems that's a bit difficult.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I saw similar annotation generated in another PR, but it was removed in some way then in that PR.

Copy link
Contributor

@WweiL WweiL Apr 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see other part of the code use quoted types, i.e. Union["StructType", str]. Maybe we should also do that for code consistency?
Also doing that seems to help with forward reference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current function uses Union[StructType, str]. Also, majority of places use this in other files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using quotes is only necessary when the type cannot be imported property because of cyclic references or if the type is not defined yet.

Copy link
Contributor

@WweiL WweiL Apr 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very sure if we could do it like this? Above in UserDefinedFunction, it takes the same outputStructType, but handles it differently

self.returnType: DataType = (

and then
if isinstance(self._output_type, UnparsedDataType):
parsed = session._analyze(
method="ddl_parse", ddl_string=self._output_type.data_type_string
).parsed

Should we also follow this pattern?

Maybe @ueshin has more context?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to port tests tests as well. We need to add connect version of test_pandas_grouped_map_with_state.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WweiL The return type of udf is a bit different. Here I am following the way how DataStreamReader handles schema. The server will parse the schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi Added versionchanged

@WweiL
Copy link
Contributor

WweiL commented Apr 11, 2023

Also I think we need to add unit test for this by reusing tests here #37894

You can follow my PR to

  1. create a new mixin class that contains all test cases from the original one but don't extend ReusedSQLTestCase, and
  2. create a new class below with the original name and extend this mixin class and ReusedSQLTestCase
  3. create a parity test class that extends this mixin class and ReusedConnectTestCase

@github-actions github-actions bot added the BUILD label Apr 12, 2023
@pengzhon-db
Copy link
Contributor Author

Also I think we need to add unit test for this by reusing tests here #37894

You can follow my PR to

  1. create a new mixin class that contains all test cases from the original one but don't extend ReusedSQLTestCase, and
  2. create a new class below with the original name and extend this mixin class and ReusedSQLTestCase
  3. create a parity test class that extends this mixin class and ReusedConnectTestCase

@WweiL I added a test file test_parity_pandas_grouped_map_with_state.py. However, I had to skip all tests due to spark.streams not supported in connect for now

@pengzhon-db pengzhon-db force-pushed the connect_applyInPandasWithState branch from abd49b3 to e4fe03d Compare April 13, 2023 18:59
@WweiL
Copy link
Contributor

WweiL commented Apr 14, 2023

Seems that there is lint error, you can run PYTHON_EXECUTABLE=python3.9 ./dev/lint-python or just ./dev/lint-python before commit to make sure

@pengzhon-db
Copy link
Contributor Author

@HyukjinKwon can u help merge this?

@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants