From ab95e6421a1d9760477f435f6442827d0a6483ee Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 7 Jun 2024 10:14:07 +0900 Subject: [PATCH 1/4] Make StreamingQueryListener.spark settable --- python/pyspark/sql/streaming/listener.py | 5 +++++ .../tests/streaming/test_streaming_listener.py | 17 +++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/python/pyspark/sql/streaming/listener.py b/python/pyspark/sql/streaming/listener.py index c1c9dce047319..c6173951b0b36 100644 --- a/python/pyspark/sql/streaming/listener.py +++ b/python/pyspark/sql/streaming/listener.py @@ -75,6 +75,11 @@ def spark(self) -> Optional["SparkSession"]: # type: ignore[name-defined] # noq else: return None + @spark.setter + def spark(self, session): + # For backward compatibility + self._sparkSession = session + def _init_listener_id(self) -> None: self._id = str(uuid.uuid4()) diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py b/python/pyspark/sql/tests/streaming/test_streaming_listener.py index 15f5575d36479..762fc335b56ad 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py @@ -592,6 +592,23 @@ def test_streaming_query_progress_fromJson(self): self.assertEqual(sink.numOutputRows, -1) self.assertEqual(sink.metrics, {}) + def test_spark_property_in_listener(self): + # SPARK-48560: Make StreamingQueryListener.spark settable + class TestListener(StreamingQueryListener): + def __init__(self, session): + self.spark = session + + def onQueryStarted(self, event): + pass + + def onQueryProgress(self, event): + pass + + def onQueryTerminated(self, event): + pass + + self.assertEqual(TestListener(self.spark).spark, self.spark) + if __name__ == "__main__": import unittest From 562ef6fcd14a889e5e1ee5af92ec743810301228 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Sat, 8 Jun 2024 13:05:48 -0700 Subject: [PATCH 2/4] address a comment --- python/pyspark/sql/streaming/listener.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/streaming/listener.py b/python/pyspark/sql/streaming/listener.py index c6173951b0b36..92f3b574b0284 100644 --- a/python/pyspark/sql/streaming/listener.py +++ b/python/pyspark/sql/streaming/listener.py @@ -26,6 +26,7 @@ if TYPE_CHECKING: from py4j.java_gateway import JavaObject + from pyspark.sql import SparkSession class StreamingQueryListener(ABC): @@ -64,19 +65,17 @@ class StreamingQueryListener(ABC): """ def _set_spark_session( - self, spark: "SparkSession" # type: ignore[name-defined] # noqa: F821 + self, session: "SparkSession" # type: ignore[name-defined] # noqa: F821 ) -> None: - self._sparkSession = spark + if self.spark is not None: + self.spark = session @property def spark(self) -> Optional["SparkSession"]: # type: ignore[name-defined] # noqa: F821 - if hasattr(self, "_sparkSession"): - return self._sparkSession - else: - return None + return getattr(self, "_sparkSession", None) @spark.setter - def spark(self, session): + def spark(self, session: "SparkSession") -> None: # For backward compatibility self._sparkSession = session From 81c0c0637a159aa1902c6afbd77bcf335f2793fc Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Sat, 8 Jun 2024 13:50:26 -0700 Subject: [PATCH 3/4] fixup --- python/pyspark/sql/streaming/listener.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/sql/streaming/listener.py b/python/pyspark/sql/streaming/listener.py index 92f3b574b0284..af27146d8ca47 100644 --- a/python/pyspark/sql/streaming/listener.py +++ b/python/pyspark/sql/streaming/listener.py @@ -26,7 +26,6 @@ if TYPE_CHECKING: from py4j.java_gateway import JavaObject - from pyspark.sql import SparkSession class StreamingQueryListener(ABC): @@ -75,7 +74,7 @@ def spark(self) -> Optional["SparkSession"]: # type: ignore[name-defined] # noq return getattr(self, "_sparkSession", None) @spark.setter - def spark(self, session: "SparkSession") -> None: + def spark(self, session: "SparkSession") -> None: # type: ignore[name-defined] # noqa: F821 # For backward compatibility self._sparkSession = session From d52fc2e2bfeab0e723b5ce7bd52c916edd42612c Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Sun, 9 Jun 2024 08:14:23 -0700 Subject: [PATCH 4/4] fixup --- python/pyspark/sql/streaming/listener.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/streaming/listener.py b/python/pyspark/sql/streaming/listener.py index af27146d8ca47..2aa63cdb91ab6 100644 --- a/python/pyspark/sql/streaming/listener.py +++ b/python/pyspark/sql/streaming/listener.py @@ -66,7 +66,7 @@ class StreamingQueryListener(ABC): def _set_spark_session( self, session: "SparkSession" # type: ignore[name-defined] # noqa: F821 ) -> None: - if self.spark is not None: + if self.spark is None: self.spark = session @property