From c5c2e79a9e84508a125ab86ffcd54e55640b0e3a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 20 Jan 2017 11:29:17 -0800 Subject: [PATCH] Fixed bug --- .../catalyst/analysis/UnsupportedOperationChecker.scala | 2 +- .../catalyst/analysis/UnsupportedOperationsSuite.scala | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index c2666b2ab912..f4d016cb9671 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -87,7 +87,7 @@ object UnsupportedOperationChecker { * data. */ def containsCompleteData(subplan: LogicalPlan): Boolean = { - val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + val aggs = subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } // Either the subplan has no streaming source, or it has aggregation with Complete mode !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 58e69f9ebea0..dcdb1ae08932 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -199,12 +199,17 @@ class UnsupportedOperationsSuite extends SparkFunSuite { _.intersect(_), streamStreamSupported = false) - // Sort: supported only on batch subplans and on aggregation + complete output mode + // Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _)) assertSupportedInStreamingPlan( - "sort - sort over aggregated data in Complete output mode", + "sort - sort after aggregation in Complete output mode", streamRelation.groupBy()(Count("*")).sortBy(), Complete) + assertNotSupportedInStreamingPlan( + "sort - sort before aggregation in Complete output mode", + streamRelation.sortBy().groupBy()(Count("*")), + Complete, + Seq("sort", "aggregat", "complete")) assertNotSupportedInStreamingPlan( "sort - sort over aggregated data in Update output mode", streamRelation.groupBy()(Count("*")).sortBy(),