Skip to content

Commit 99943bf

Browse files
committed
[SPARK-17731][SQL][STREAMING][FOLLOWUP] Refactored StreamingQueryListener APIs for branch-2.0
This is the branch-2.0 PR of #15530 to make the APIs consistent with the master. Since these APIs are experimental and not direct user facing (StreamingQueryListener is advanced Structured Streaming APIs), its okay to change them in branch-2.0. ## What changes were proposed in this pull request? As per rxin request, here are further API changes - Changed `Stream(Started/Progress/Terminated)` events to `Stream*Event` - Changed the fields in `StreamingQueryListener.on***` from `query*` to `event` ## How was this patch tested? Existing unit tests. Author: Tathagata Das <[email protected]> Closes #15535 from tdas/SPARK-17731-1-branch-2.0.
1 parent f6b8793 commit 99943bf

File tree

7 files changed

+43
-31
lines changed

7 files changed

+43
-31
lines changed

project/MimaExcludes.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,7 @@ object MimaExcludes {
788788
// SPARK-16240: ML persistence backward compatibility for LDA
789789
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$")
790790
) ++ Seq(
791+
// [SPARK-17731][SQL][Streaming] Metrics for structured streaming
791792
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryInfo"),
792793
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.this"),
793794
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.queryInfo"),
@@ -796,7 +797,16 @@ object MimaExcludes {
796797
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"),
797798
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.SourceStatus.offsetDesc"),
798799
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceStatus.this"),
799-
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status")
800+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status"),
801+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStarted"),
802+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress"),
803+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"),
804+
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"),
805+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"),
806+
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
807+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
808+
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
809+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated")
800810
)
801811
}
802812

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ class StreamExecution(
165165
new Path(new Path(checkpointRoot), name).toUri.toString
166166

167167
/**
168-
* Starts the execution. This returns only after the thread has started and [[QueryStarted]] event
168+
* Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
169169
* has been posted to all the listeners.
170170
*/
171171
def start(): Unit = {
@@ -177,9 +177,10 @@ class StreamExecution(
177177
/**
178178
* Repeatedly attempts to run batches as data arrives.
179179
*
180-
* Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted
181-
* such that listeners are guaranteed to get a start event before a termination. Furthermore, this
182-
* method also ensures that [[QueryStarted]] event is posted before the `start()` method returns.
180+
* Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are
181+
* posted such that listeners are guaranteed to get a start event before a termination.
182+
* Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the
183+
* `start()` method returns.
183184
*/
184185
private def runBatches(): Unit = {
185186
try {
@@ -190,7 +191,7 @@ class StreamExecution(
190191
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
191192
}
192193
updateStatus()
193-
postEvent(new QueryStarted(currentStatus)) // Assumption: Does not throw exception.
194+
postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not throw exception.
194195

195196
// Unblock starting thread
196197
startLatch.countDown()
@@ -232,7 +233,7 @@ class StreamExecution(
232233
// Update metrics and notify others
233234
streamMetrics.reportTriggerFinished()
234235
updateStatus()
235-
postEvent(new QueryProgress(currentStatus))
236+
postEvent(new QueryProgressEvent(currentStatus))
236237
isTerminated
237238
})
238239
} catch {
@@ -260,7 +261,7 @@ class StreamExecution(
260261
// Notify others
261262
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
262263
postEvent(
263-
new QueryTerminated(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
264+
new QueryTerminatedEvent(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
264265
terminationLatch.countDown()
265266
}
266267
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
4040
*/
4141
def post(event: StreamingQueryListener.Event) {
4242
event match {
43-
case s: QueryStarted =>
43+
case s: QueryStartedEvent =>
4444
postToAll(s)
4545
case _ =>
4646
sparkListenerBus.post(event)
@@ -59,11 +59,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
5959
listener: StreamingQueryListener,
6060
event: StreamingQueryListener.Event): Unit = {
6161
event match {
62-
case queryStarted: QueryStarted =>
62+
case queryStarted: QueryStartedEvent =>
6363
listener.onQueryStarted(queryStarted)
64-
case queryProgress: QueryProgress =>
64+
case queryProgress: QueryProgressEvent =>
6565
listener.onQueryProgress(queryProgress)
66-
case queryTerminated: QueryTerminated =>
66+
case queryTerminated: QueryTerminatedEvent =>
6767
listener.onQueryTerminated(queryTerminated)
6868
case _ =>
6969
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,24 @@ abstract class StreamingQueryListener {
4141
* don't block this method as it will block your query.
4242
* @since 2.0.0
4343
*/
44-
def onQueryStarted(queryStarted: QueryStarted): Unit
44+
def onQueryStarted(event: QueryStartedEvent): Unit
4545

4646
/**
4747
* Called when there is some status update (ingestion rate updated, etc.)
4848
*
4949
* @note This method is asynchronous. The status in [[StreamingQuery]] will always be
5050
* latest no matter when this method is called. Therefore, the status of [[StreamingQuery]]
5151
* may be changed before/when you process the event. E.g., you may find [[StreamingQuery]]
52-
* is terminated when you are processing [[QueryProgress]].
52+
* is terminated when you are processing [[QueryProgressEvent]].
5353
* @since 2.0.0
5454
*/
55-
def onQueryProgress(queryProgress: QueryProgress): Unit
55+
def onQueryProgress(event: QueryProgressEvent): Unit
5656

5757
/**
5858
* Called when a query is stopped, with or without error.
5959
* @since 2.0.0
6060
*/
61-
def onQueryTerminated(queryTerminated: QueryTerminated): Unit
61+
def onQueryTerminated(event: QueryTerminatedEvent): Unit
6262
}
6363

6464

@@ -84,15 +84,15 @@ object StreamingQueryListener {
8484
* @since 2.0.0
8585
*/
8686
@Experimental
87-
class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) extends Event
87+
class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event
8888

8989
/**
9090
* :: Experimental ::
9191
* Event representing any progress updates in a query
9292
* @since 2.0.0
9393
*/
9494
@Experimental
95-
class QueryProgress private[sql](val queryStatus: StreamingQueryStatus) extends Event
95+
class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event
9696

9797
/**
9898
* :: Experimental ::
@@ -104,7 +104,7 @@ object StreamingQueryListener {
104104
* @since 2.0.0
105105
*/
106106
@Experimental
107-
class QueryTerminated private[sql](
107+
class QueryTerminatedEvent private[sql](
108108
val queryStatus: StreamingQueryStatus,
109109
val exception: Option[String]) extends Event
110110
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -676,20 +676,20 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
676676
}
677677

678678

679-
override def onQueryStarted(queryStarted: QueryStarted): Unit = {
679+
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
680680
asyncTestWaiter {
681681
startStatus = queryStarted.queryStatus
682682
}
683683
}
684684

685-
override def onQueryProgress(queryProgress: QueryProgress): Unit = {
685+
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
686686
asyncTestWaiter {
687687
assert(startStatus != null, "onQueryProgress called before onQueryStarted")
688688
synchronized { progressStatuses += queryProgress.queryStatus }
689689
}
690690
}
691691

692-
override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
692+
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
693693
asyncTestWaiter {
694694
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
695695
terminationStatus = queryTerminated.queryStatus

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,30 +177,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
177177
}
178178

179179
test("QueryStarted serialization") {
180-
val queryStarted = new StreamingQueryListener.QueryStarted(StreamingQueryStatus.testStatus)
180+
val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus)
181181
val json = JsonProtocol.sparkEventToJson(queryStarted)
182182
val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
183-
.asInstanceOf[StreamingQueryListener.QueryStarted]
183+
.asInstanceOf[StreamingQueryListener.QueryStartedEvent]
184184
assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus)
185185
}
186186

187187
test("QueryProgress serialization") {
188-
val queryProcess = new StreamingQueryListener.QueryProgress(StreamingQueryStatus.testStatus)
188+
val queryProcess = new StreamingQueryListener.QueryProgressEvent(
189+
StreamingQueryStatus.testStatus)
189190
val json = JsonProtocol.sparkEventToJson(queryProcess)
190191
val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
191-
.asInstanceOf[StreamingQueryListener.QueryProgress]
192+
.asInstanceOf[StreamingQueryListener.QueryProgressEvent]
192193
assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus)
193194
}
194195

195196
test("QueryTerminated serialization") {
196197
val exception = new RuntimeException("exception")
197-
val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
198+
val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
198199
StreamingQueryStatus.testStatus,
199200
Some(exception.getMessage))
200201
val json =
201202
JsonProtocol.sparkEventToJson(queryQueryTerminated)
202203
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
203-
.asInstanceOf[StreamingQueryListener.QueryTerminated]
204+
.asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
204205
assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus)
205206
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
206207
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,11 +290,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
290290
// A StreamingQueryListener that gets the query status after the first completed trigger
291291
val listener = new StreamingQueryListener {
292292
@volatile var firstStatus: StreamingQueryStatus = null
293-
override def onQueryStarted(queryStarted: QueryStarted): Unit = { }
294-
override def onQueryProgress(queryProgress: QueryProgress): Unit = {
293+
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
294+
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
295295
if (firstStatus == null) firstStatus = queryProgress.queryStatus
296296
}
297-
override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { }
297+
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { }
298298
}
299299

300300
try {

0 commit comments

Comments
 (0)