Skip to content

Commit 39eb79a

Browse files
HeartSaVioRsrowen
authored andcommitted
[SPARK-28074][SS] Log warn message on possible correctness issue for multiple stateful operations in single query
## What changes were proposed in this pull request? Please refer [the link on dev. mailing list](https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a%3Cdev.spark.apache.org%3E) to see rationalization of this patch. This patch adds the functionality to detect the possible correct issue on multiple stateful operations in single streaming query and logs warning message to inform end users. This patch also documents some notes to inform caveats when using multiple stateful operations in single query, and provide one known alternative. ## How was this patch tested? Added new UTs in UnsupportedOperationsSuite to test various combination of stateful operators on streaming query. Closes #24890 from HeartSaVioR/SPARK-28074. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 1018390 commit 39eb79a

File tree

3 files changed

+250
-3
lines changed

3 files changed

+250
-3
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1505,7 +1505,6 @@ Additional details on supported joins:
15051505

15061506
- Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.
15071507

1508-
15091508
### Streaming Deduplication
15101509
You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.
15111510

@@ -1616,6 +1615,8 @@ this configuration judiciously.
16161615
### Arbitrary Stateful Operations
16171616
Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)).
16181617

1618+
Though Spark cannot check and force it, the state function should be implemented with respect to the semantics of the output mode. For example, in Update mode Spark doesn't expect that the state function will emit rows which are older than current watermark plus allowed late record delay, whereas in Append mode the state function can emit these rows.
1619+
16191620
### Unsupported Operations
16201621
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets.
16211622
Some of them are as follows.
@@ -1647,6 +1648,26 @@ For example, sorting on the input stream is not supported, as it requires keepin
16471648
track of all the data received in the stream. This is therefore fundamentally hard to execute
16481649
efficiently.
16491650

1651+
### Limitation of global watermark
1652+
1653+
In Append mode, if a stateful operation emits rows older than current watermark plus allowed late record delay,
1654+
they will be "late rows" in downstream stateful operations (as Spark uses global watermark). Note that these rows may be discarded.
1655+
This is a limitation of a global watermark, and it could potentially cause a correctness issue.
1656+
1657+
Spark will check the logical plan of query and log a warning when Spark detects such a pattern.
1658+
1659+
Any of the stateful operation(s) after any of below stateful operations can have this issue:
1660+
1661+
* streaming aggregation in Append mode
1662+
* stream-stream outer join
1663+
* `mapGroupsWithState` and `flatMapGroupsWithState` in Append mode (depending on the implementation of the state function)
1664+
1665+
As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function
1666+
emits late rows if the operator uses Append mode.
1667+
1668+
There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure
1669+
end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.
1670+
16501671
## Starting Streaming Queries
16511672
Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the `DataStreamWriter`
16521673
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamWriter) docs)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import org.apache.spark.internal.Logging
2021
import org.apache.spark.sql.AnalysisException
2122
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, CurrentDate, CurrentTimestamp, MonotonicallyIncreasingID}
2223
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -30,7 +31,7 @@ import org.apache.spark.sql.streaming.OutputMode
3031
/**
3132
* Analyzes the presence of unsupported operations in a logical plan.
3233
*/
33-
object UnsupportedOperationChecker {
34+
object UnsupportedOperationChecker extends Logging {
3435

3536
def checkForBatch(plan: LogicalPlan): Unit = {
3637
plan.foreachUp {
@@ -41,8 +42,50 @@ object UnsupportedOperationChecker {
4142
}
4243
}
4344

44-
def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = {
45+
def checkStreamingQueryGlobalWatermarkLimit(
46+
plan: LogicalPlan,
47+
outputMode: OutputMode,
48+
failWhenDetected: Boolean): Unit = {
49+
def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
50+
case s: Aggregate
51+
if s.isStreaming && outputMode == InternalOutputModes.Append => true
52+
case Join(left, right, joinType, _, _)
53+
if left.isStreaming && right.isStreaming && joinType != Inner => true
54+
case f: FlatMapGroupsWithState
55+
if f.isStreaming && f.outputMode == OutputMode.Append() => true
56+
case _ => false
57+
}
58+
59+
def isStatefulOperation(p: LogicalPlan): Boolean = p match {
60+
case s: Aggregate if s.isStreaming => true
61+
case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true
62+
case f: FlatMapGroupsWithState if f.isStreaming => true
63+
case d: Deduplicate if d.isStreaming => true
64+
case _ => false
65+
}
66+
67+
try {
68+
plan.foreach { subPlan =>
69+
if (isStatefulOperation(subPlan)) {
70+
subPlan.find { p =>
71+
(p ne subPlan) && isStatefulOperationPossiblyEmitLateRows(p)
72+
}.foreach { _ =>
73+
val errorMsg = "Detected pattern of possible 'correctness' issue " +
74+
"due to global watermark. " +
75+
"The query contains stateful operation which can emit rows older than " +
76+
"the current watermark plus allowed late record delay, which are \"late rows\"" +
77+
" in downstream stateful operations and these rows can be discarded. " +
78+
"Please refer the programming guide doc for more details."
79+
throwError(errorMsg)(plan)
80+
}
81+
}
82+
}
83+
} catch {
84+
case e: AnalysisException if !failWhenDetected => logWarning(s"${e.message};\n$plan")
85+
}
86+
}
4587

88+
def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = {
4689
if (!plan.isStreaming) {
4790
throwError(
4891
"Queries without streaming sources cannot be executed with writeStream.start()")(plan)
@@ -339,6 +382,8 @@ object UnsupportedOperationChecker {
339382
// Check if there are unsupported expressions in streaming query plan.
340383
checkUnsupportedExpressions(subPlan)
341384
}
385+
386+
checkStreamingQueryGlobalWatermarkLimit(plan, outputMode, failWhenDetected = false)
342387
}
343388

344389
def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,153 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
643643
null,
644644
new TestStreamingRelationV2(attribute)), OutputMode.Append())
645645

646+
// streaming aggregation
647+
{
648+
assertPassOnGlobalWatermarkLimit(
649+
"single streaming aggregation in Append mode",
650+
streamRelation.groupBy("a")(count("*")),
651+
OutputMode.Append())
652+
653+
assertFailOnGlobalWatermarkLimit(
654+
"chained streaming aggregations in Append mode",
655+
streamRelation.groupBy("a")(count("*")).groupBy()(count("*")),
656+
OutputMode.Append())
657+
658+
Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
659+
val plan = streamRelation.join(streamRelation.groupBy("a")(count("*")), joinType = joinType)
660+
assertFailOnGlobalWatermarkLimit(
661+
s"$joinType join after streaming aggregation in Append mode",
662+
streamRelation.join(streamRelation.groupBy("a")(count("*")), joinType = joinType),
663+
OutputMode.Append())
664+
}
665+
666+
assertFailOnGlobalWatermarkLimit(
667+
"deduplicate after streaming aggregation in Append mode",
668+
Deduplicate(Seq(attribute), streamRelation.groupBy("a")(count("*"))),
669+
OutputMode.Append())
670+
671+
assertFailOnGlobalWatermarkLimit(
672+
"FlatMapGroupsWithState after streaming aggregation in Append mode",
673+
FlatMapGroupsWithState(
674+
null, att, att, Seq(att), Seq(att), att, null, Append,
675+
isMapGroupsWithState = false, null,
676+
streamRelation.groupBy("a")(count("*"))),
677+
OutputMode.Append())
678+
}
679+
680+
// stream-stream join
681+
// stream-stream inner join doesn't emit late rows, whereas outer joins could
682+
Seq((Inner, false), (LeftOuter, true), (RightOuter, true)).map { case (joinType, expectFailure) =>
683+
assertPassOnGlobalWatermarkLimit(
684+
s"single $joinType join in Append mode",
685+
streamRelation.join(streamRelation, joinType = RightOuter,
686+
condition = Some(attributeWithWatermark === attribute)),
687+
OutputMode.Append())
688+
689+
testGlobalWatermarkLimit(
690+
s"streaming aggregation after stream-stream $joinType join in Append mode",
691+
streamRelation.join(streamRelation, joinType = joinType,
692+
condition = Some(attributeWithWatermark === attribute))
693+
.groupBy("a")(count("*")),
694+
OutputMode.Append(),
695+
expectFailure = expectFailure)
696+
697+
Seq(Inner, LeftOuter, RightOuter).map { joinType2 =>
698+
testGlobalWatermarkLimit(
699+
s"streaming-stream $joinType2 after stream-stream $joinType join in Append mode",
700+
streamRelation.join(
701+
streamRelation.join(streamRelation, joinType = joinType,
702+
condition = Some(attributeWithWatermark === attribute)),
703+
joinType = joinType2,
704+
condition = Some(attributeWithWatermark === attribute)),
705+
OutputMode.Append(),
706+
expectFailure = expectFailure)
707+
}
708+
709+
testGlobalWatermarkLimit(
710+
s"FlatMapGroupsWithState after stream-stream $joinType join in Append mode",
711+
FlatMapGroupsWithState(
712+
null, att, att, Seq(att), Seq(att), att, null, Append,
713+
isMapGroupsWithState = false, null,
714+
streamRelation.join(streamRelation, joinType = joinType,
715+
condition = Some(attributeWithWatermark === attribute))),
716+
OutputMode.Append(),
717+
expectFailure = expectFailure)
718+
719+
testGlobalWatermarkLimit(
720+
s"deduplicate after stream-stream $joinType join in Append mode",
721+
Deduplicate(Seq(attribute), streamRelation.join(streamRelation, joinType = joinType,
722+
condition = Some(attributeWithWatermark === attribute))),
723+
OutputMode.Append(),
724+
expectFailure = expectFailure)
725+
}
726+
727+
// FlatMapGroupsWithState
728+
{
729+
assertPassOnGlobalWatermarkLimit(
730+
"single FlatMapGroupsWithState in Append mode",
731+
FlatMapGroupsWithState(
732+
null, att, att, Seq(att), Seq(att), att, null, Append,
733+
isMapGroupsWithState = false, null, streamRelation),
734+
OutputMode.Append())
735+
736+
assertFailOnGlobalWatermarkLimit(
737+
"streaming aggregation after FlatMapGroupsWithState in Append mode",
738+
FlatMapGroupsWithState(
739+
null, att, att, Seq(att), Seq(att), att, null, Append,
740+
isMapGroupsWithState = false, null, streamRelation).groupBy("*")(count("*")),
741+
OutputMode.Append())
742+
743+
Seq(Inner, LeftOuter, RightOuter).map { joinType =>
744+
assertFailOnGlobalWatermarkLimit(
745+
s"stream-stream $joinType after FlatMapGroupsWithState in Append mode",
746+
streamRelation.join(
747+
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
748+
isMapGroupsWithState = false, null, streamRelation), joinType = joinType,
749+
condition = Some(attributeWithWatermark === attribute)),
750+
OutputMode.Append())
751+
}
752+
753+
assertFailOnGlobalWatermarkLimit(
754+
"FlatMapGroupsWithState after FlatMapGroupsWithState in Append mode",
755+
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
756+
isMapGroupsWithState = false, null,
757+
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
758+
isMapGroupsWithState = false, null, streamRelation)),
759+
OutputMode.Append())
760+
761+
assertFailOnGlobalWatermarkLimit(
762+
s"deduplicate after FlatMapGroupsWithState in Append mode",
763+
Deduplicate(Seq(attribute),
764+
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
765+
isMapGroupsWithState = false, null, streamRelation)),
766+
OutputMode.Append())
767+
}
768+
769+
// deduplicate
770+
{
771+
assertPassOnGlobalWatermarkLimit(
772+
"streaming aggregation after deduplicate in Append mode",
773+
Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")),
774+
OutputMode.Append())
775+
776+
Seq(Inner, LeftOuter, RightOuter).map { joinType =>
777+
assertPassOnGlobalWatermarkLimit(
778+
s"$joinType join after deduplicate in Append mode",
779+
streamRelation.join(Deduplicate(Seq(attribute), streamRelation), joinType = joinType,
780+
condition = Some(attributeWithWatermark === attribute)),
781+
OutputMode.Append())
782+
}
783+
784+
assertPassOnGlobalWatermarkLimit(
785+
"FlatMapGroupsWithState after deduplicate in Append mode",
786+
FlatMapGroupsWithState(
787+
null, att, att, Seq(att), Seq(att), att, null, Append,
788+
isMapGroupsWithState = false, null,
789+
Deduplicate(Seq(attribute), streamRelation)),
790+
OutputMode.Append())
791+
}
792+
646793
/*
647794
=======================================================================================
648795
TESTING FUNCTIONS
@@ -839,6 +986,40 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
839986
}
840987
}
841988

989+
990+
def assertPassOnGlobalWatermarkLimit(
991+
testNamePostfix: String,
992+
plan: LogicalPlan,
993+
outputMode: OutputMode): Unit = {
994+
testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = false)
995+
}
996+
997+
def assertFailOnGlobalWatermarkLimit(
998+
testNamePostfix: String,
999+
plan: LogicalPlan,
1000+
outputMode: OutputMode): Unit = {
1001+
testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = true)
1002+
}
1003+
1004+
def testGlobalWatermarkLimit(
1005+
testNamePostfix: String,
1006+
plan: LogicalPlan,
1007+
outputMode: OutputMode,
1008+
expectFailure: Boolean): Unit = {
1009+
test(s"Global watermark limit - $testNamePostfix") {
1010+
if (expectFailure) {
1011+
val e = intercept[AnalysisException] {
1012+
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
1013+
wrapInStreaming(plan), outputMode, failWhenDetected = true)
1014+
}
1015+
assert(e.message.contains("Detected pattern of possible 'correctness' issue"))
1016+
} else {
1017+
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
1018+
wrapInStreaming(plan), outputMode, failWhenDetected = true)
1019+
}
1020+
}
1021+
}
1022+
8421023
/**
8431024
* Test whether the body of code will fail. If it does fail, then check if it has expected
8441025
* messages.

0 commit comments

Comments
 (0)