Skip to content

Commit 0d3a631

Browse files
tdaszsxwing
authored andcommitted
[SPARK-20714][SS] Fix match error when watermark is set with timeout = no timeout / processing timeout
## What changes were proposed in this pull request? When watermark is set, and timeout conf is NoTimeout or ProcessingTimeTimeout (both do not need the watermark), the query fails at runtime with the following exception. ``` MatchException: Some(org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate1a9b798e) (of class scala.Some) org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1.apply(FlatMapGroupsWithStateExec.scala:120) org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1.apply(FlatMapGroupsWithStateExec.scala:116) org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70) org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65) org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) ``` The match did not correctly handle cases where watermark was defined by the timeout was different from EventTimeTimeout. ## How was this patch tested? New unit tests. Author: Tathagata Das <[email protected]> Closes #17954 from tdas/SPARK-20714.
1 parent 7d6ff39 commit 0d3a631

File tree

2 files changed

+40
-2
lines changed

2 files changed

+40
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ case class FlatMapGroupsWithStateExec(
120120
val filteredIter = watermarkPredicateForData match {
121121
case Some(predicate) if timeoutConf == EventTimeTimeout =>
122122
iter.filter(row => !predicate.eval(row))
123-
case None =>
123+
case _ =>
124124
iter
125125
}
126126

sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
589589
)
590590
}
591591

592-
test("flatMapGroupsWithState - streaming with event time timeout") {
592+
test("flatMapGroupsWithState - streaming with event time timeout + watermark") {
593593
// Function to maintain the max event time
594594
// Returns the max event time in the state, or -1 if the state was removed by timeout
595595
val stateFunc = (
@@ -761,6 +761,44 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
761761
assert(e.getMessage === "The output mode of function should be append or update")
762762
}
763763

764+
def testWithTimeout(timeoutConf: GroupStateTimeout): Unit = {
765+
test("SPARK-20714: watermark does not fail query when timeout = " + timeoutConf) {
766+
// Function to maintain running count up to 2, and then remove the count
767+
// Returns the data and the count (-1 if count reached beyond 2 and state was just removed)
768+
val stateFunc =
769+
(key: String, values: Iterator[(String, Long)], state: GroupState[RunningCount]) => {
770+
if (state.hasTimedOut) {
771+
state.remove()
772+
Iterator((key, "-1"))
773+
} else {
774+
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
775+
state.update(RunningCount(count))
776+
state.setTimeoutDuration("10 seconds")
777+
Iterator((key, count.toString))
778+
}
779+
}
780+
781+
val clock = new StreamManualClock
782+
val inputData = MemoryStream[(String, Long)]
783+
val result =
784+
inputData.toDF().toDF("key", "time")
785+
.selectExpr("key", "cast(time as timestamp) as timestamp")
786+
.withWatermark("timestamp", "10 second")
787+
.as[(String, Long)]
788+
.groupByKey(x => x._1)
789+
.flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc)
790+
791+
testStream(result, Update)(
792+
StartStream(ProcessingTime("1 second"), triggerClock = clock),
793+
AddData(inputData, ("a", 1L)),
794+
AdvanceManualClock(1 * 1000),
795+
CheckLastBatch(("a", "1"))
796+
)
797+
}
798+
}
799+
testWithTimeout(NoTimeout)
800+
testWithTimeout(ProcessingTimeTimeout)
801+
764802
def testStateUpdateWithData(
765803
testName: String,
766804
stateUpdates: GroupState[Int] => Unit,

0 commit comments

Comments
 (0)