Skip to content

Commit 2995b79

Browse files
jerryshaotdas
authored andcommitted
[SPARK-23748][SS] Fix SS continuous process doesn't support SubqueryAlias issue
## What changes were proposed in this pull request? Current SS continuous doesn't support processing on temp table or `df.as("xxx")`, SS will throw an exception as LogicalPlan not supported, details described in [here](https://issues.apache.org/jira/browse/SPARK-23748). So here propose to add this support. ## How was this patch tested? new UT. Author: jerryshao <[email protected]> Closes #21017 from jerryshao/SPARK-23748. (cherry picked from commit 14291b0) Signed-off-by: Tathagata Das <[email protected]>
1 parent 908c681 commit 2995b79

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ object UnsupportedOperationChecker {
345345
plan.foreachUp { implicit subPlan =>
346346
subPlan match {
347347
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
348-
_: DeserializeToObject | _: SerializeFromObject) =>
348+
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias) =>
349349
case node if node.nodeName == "StreamingRelationV2" =>
350350
case node =>
351351
throwError(s"Continuous processing does not support ${node.nodeName} operations.")

sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,25 @@ class ContinuousSuite extends ContinuousSuiteBase {
174174
"Continuous processing does not support current time operations."))
175175
}
176176

177+
test("subquery alias") {
178+
val df = spark.readStream
179+
.format("rate")
180+
.option("numPartitions", "5")
181+
.option("rowsPerSecond", "5")
182+
.load()
183+
.createOrReplaceTempView("rate")
184+
val test = spark.sql("select value from rate where value > 5")
185+
186+
testStream(test, useV2Sink = true)(
187+
StartStream(longContinuousTrigger),
188+
AwaitEpoch(0),
189+
Execute(waitForRateSourceTriggers(_, 2)),
190+
IncrementEpoch(),
191+
Execute(waitForRateSourceTriggers(_, 4)),
192+
IncrementEpoch(),
193+
CheckAnswerRowsContains(scala.Range(6, 20).map(Row(_))))
194+
}
195+
177196
test("repeatedly restart") {
178197
val df = spark.readStream
179198
.format("rate")

0 commit comments

Comments
 (0)