From 550ff99652de515e9ee056596350a8cbf802f938 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 9 Feb 2018 14:57:08 +0100 Subject: [PATCH 01/11] [SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer --- .../sql/catalyst/optimizer/Optimizer.scala | 3 +++ .../catalyst/plans/logical/LogicalPlan.scala | 11 +++++++++++ .../plans/logical/basicLogicalOperators.scala | 17 ++++++++++++----- .../optimizer/EliminateSortsSuite.scala | 15 ++++++++++++++- 4 files changed, 40 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a28b6a0feb8f9..775633695ca53 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -730,6 +730,9 @@ object EliminateSorts extends Rule[LogicalPlan] { case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) => val newOrders = orders.filterNot(_.child.foldable) if (newOrders.isEmpty) child else s.copy(order = newOrders) + case Sort(orders, true, child) if child.isSorted && child.sortedOrder.get.zip(orders).forall { + case (s1, s2) => s1.satisfies(s2) } => + child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index c8ccd9bd03994..5b28d79537168 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -219,6 +219,13 @@ abstract class LogicalPlan * Refreshes (or invalidates) any metadata/data cached in the plan recursively. */ def refresh(): Unit = children.foreach(_.refresh()) + + /** + * If the current plan contains sorted data, it contains the sorted order. + */ + def sortedOrder: Option[Seq[SortOrder]] = None + + final def isSorted: Boolean = sortedOrder.isDefined } /** @@ -274,3 +281,7 @@ abstract class BinaryNode extends LogicalPlan { override final def children: Seq[LogicalPlan] = Seq(left, right) } + +abstract class KeepOrderUnaryNode extends UnaryNode { + override final def sortedOrder: Option[Seq[SortOrder]] = child.sortedOrder +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index a4fca790dd086..bc72dc4c92a88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -43,11 +43,12 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { * This node is inserted at the top of a subquery when it is optimized. This makes sure we can * recognize a subquery as such, and it allows us to write subquery aware transformations. */ -case class Subquery(child: LogicalPlan) extends UnaryNode { +case class Subquery(child: LogicalPlan) extends KeepOrderUnaryNode { override def output: Seq[Attribute] = child.output } -case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { +case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) + extends KeepOrderUnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) override def maxRows: Option[Long] = child.maxRows @@ -125,7 +126,7 @@ case class Generate( } case class Filter(condition: Expression, child: LogicalPlan) - extends UnaryNode with PredicateHelper { + extends KeepOrderUnaryNode with PredicateHelper { override def output: Seq[Attribute] = child.output override def maxRows: Option[Long] = child.maxRows @@ -469,6 +470,7 @@ case class Sort( child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def maxRows: Option[Long] = child.maxRows + override def sortedOrder: Option[Seq[SortOrder]] = Some(order) } /** Factory for constructing new `Range` nodes. */ @@ -728,7 +730,7 @@ object Limit { * * See [[Limit]] for more information. */ -case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { +case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOrderUnaryNode { override def output: Seq[Attribute] = child.output override def maxRows: Option[Long] = { limitExpr match { @@ -764,7 +766,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo case class SubqueryAlias( alias: String, child: LogicalPlan) - extends UnaryNode { + extends KeepOrderUnaryNode { override def doCanonicalize(): LogicalPlan = child.canonicalized @@ -867,6 +869,11 @@ case class RepartitionByExpression( override def maxRows: Option[Long] = child.maxRows override def shuffle: Boolean = true + + override def sortedOrder: Option[Seq[SortOrder]] = partitioning match { + case RangePartitioning(sortedOrder, _) => Some(sortedOrder) + case _ => None + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index e318f36d78270..e1f53ef6388de 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -37,7 +37,8 @@ class EliminateSortsSuite extends PlanTest { val batches = Batch("Eliminate Sorts", FixedPoint(10), FoldablePropagation, - EliminateSorts) :: Nil + EliminateSorts, + CollapseProject) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -83,4 +84,16 @@ class EliminateSortsSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + test("remove redundant order by") { + val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) + val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst) + val optimized = Optimize.execute(analyzer.execute(unnecessaryReordered)) + val correctAnswer = analyzer.execute(orderedPlan.select('a)) + comparePlans(Optimize.execute(optimized), correctAnswer) + val reorderedDifferently = orderedPlan.select('a).orderBy('a.asc, 'b.desc) + val nonOptimized = Optimize.execute(analyzer.execute(reorderedDifferently)) + val correctAnswerNonOptimized = analyzer.execute(reorderedDifferently) + comparePlans(nonOptimized, correctAnswerNonOptimized) + } } From 81e48286806d36e7630e961168d87cbad4f10194 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sat, 10 Feb 2018 13:00:56 +0100 Subject: [PATCH 02/11] Use separate rule and add more tests --- .../sql/catalyst/optimizer/Optimizer.scala | 14 ++- .../catalyst/plans/logical/LogicalPlan.scala | 6 +- .../plans/logical/basicLogicalOperators.scala | 12 ++- .../optimizer/EliminateSortsSuite.scala | 15 +-- .../optimizer/RemoveRedundantSortsSuite.scala | 93 +++++++++++++++++++ .../execution/columnar/InMemoryRelation.scala | 2 + .../spark/sql/execution/PlannerSuite.scala | 18 +++- 7 files changed, 133 insertions(+), 27 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 775633695ca53..93097852528c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -140,6 +140,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) operatorOptimizationBatch) :+ Batch("Join Reorder", Once, CostBasedJoinReorder) :+ + Batch("Remove Redundant Sorts", Once, + RemoveRedundantSorts) :+ Batch("Decimal Optimizations", fixedPoint, DecimalAggregates) :+ Batch("Object Expressions Optimization", fixedPoint, @@ -730,8 +732,16 @@ object EliminateSorts extends Rule[LogicalPlan] { case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) => val newOrders = orders.filterNot(_.child.foldable) if (newOrders.isEmpty) child else s.copy(order = newOrders) - case Sort(orders, true, child) if child.isSorted && child.sortedOrder.get.zip(orders).forall { - case (s1, s2) => s1.satisfies(s2) } => + } +} + +/** + * Removes Sort operations on already sorted data + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case Sort(orders, true, child) if child.sortedOrder.nonEmpty + && child.sortedOrder.zip(orders).forall { case (s1, s2) => s1.satisfies(s2) } => child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 5b28d79537168..99b3fe6588f2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -223,9 +223,7 @@ abstract class LogicalPlan /** * If the current plan contains sorted data, it contains the sorted order. */ - def sortedOrder: Option[Seq[SortOrder]] = None - - final def isSorted: Boolean = sortedOrder.isDefined + def sortedOrder: Seq[SortOrder] = Nil } /** @@ -283,5 +281,5 @@ abstract class BinaryNode extends LogicalPlan { } abstract class KeepOrderUnaryNode extends UnaryNode { - override final def sortedOrder: Option[Seq[SortOrder]] = child.sortedOrder + override final def sortedOrder: Seq[SortOrder] = child.sortedOrder } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index bc72dc4c92a88..127398d7e7389 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -470,7 +470,7 @@ case class Sort( child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def maxRows: Option[Long] = child.maxRows - override def sortedOrder: Option[Seq[SortOrder]] = Some(order) + override def sortedOrder: Seq[SortOrder] = order } /** Factory for constructing new `Range` nodes. */ @@ -524,6 +524,8 @@ case class Range( override def computeStats(): Statistics = { Statistics(sizeInBytes = LongType.defaultSize * numElements) } + + override def sortedOrder: Seq[SortOrder] = output.map(a => SortOrder(a, Descending)) } case class Aggregate( @@ -746,7 +748,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOr * * See [[Limit]] for more information. */ -case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { +case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOrderUnaryNode { override def output: Seq[Attribute] = child.output override def maxRowsPerPartition: Option[Long] = { @@ -870,9 +872,9 @@ case class RepartitionByExpression( override def maxRows: Option[Long] = child.maxRows override def shuffle: Boolean = true - override def sortedOrder: Option[Seq[SortOrder]] = partitioning match { - case RangePartitioning(sortedOrder, _) => Some(sortedOrder) - case _ => None + override def sortedOrder: Seq[SortOrder] = partitioning match { + case RangePartitioning(sortedOrder, _) => sortedOrder + case _ => Nil } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index e1f53ef6388de..e318f36d78270 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -37,8 +37,7 @@ class EliminateSortsSuite extends PlanTest { val batches = Batch("Eliminate Sorts", FixedPoint(10), FoldablePropagation, - EliminateSorts, - CollapseProject) :: Nil + EliminateSorts) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -84,16 +83,4 @@ class EliminateSortsSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - - test("remove redundant order by") { - val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) - val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst) - val optimized = Optimize.execute(analyzer.execute(unnecessaryReordered)) - val correctAnswer = analyzer.execute(orderedPlan.select('a)) - comparePlans(Optimize.execute(optimized), correctAnswer) - val reorderedDifferently = orderedPlan.select('a).orderBy('a.asc, 'b.desc) - val nonOptimized = Optimize.execute(analyzer.execute(reorderedDifferently)) - val correctAnswerNonOptimized = analyzer.execute(reorderedDifferently) - comparePlans(nonOptimized, correctAnswerNonOptimized) - } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala new file mode 100644 index 0000000000000..9bc53b756ae01 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, ORDER_BY_ORDINAL} + +class RemoveRedundantSortsSuite extends PlanTest { + override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, ORDER_BY_ORDINAL -> false) + val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) + val analyzer = new Analyzer(catalog, conf) + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Remove Redundant Sorts", Once, + RemoveRedundantSorts) :: + Batch("Collapse Project", Once, + CollapseProject) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("remove redundant order by") { + val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) + val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst) + val optimized = Optimize.execute(analyzer.execute(unnecessaryReordered)) + val correctAnswer = analyzer.execute(orderedPlan.select('a)) + comparePlans(Optimize.execute(optimized), correctAnswer) + } + + test("do not remove sort if the order is different") { + val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) + val reorderedDifferently = orderedPlan.select('a).orderBy('a.asc, 'b.desc) + val optimized = Optimize.execute(analyzer.execute(reorderedDifferently)) + val correctAnswer = analyzer.execute(reorderedDifferently) + comparePlans(optimized, correctAnswer) + } + + test("filters don't affect order") { + val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) + val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc) + val optimized = Optimize.execute(analyzer.execute(filteredAndReordered)) + val correctAnswer = analyzer.execute(orderedPlan.where('a > Literal(10))) + comparePlans(optimized, correctAnswer) + } + + test("limits don't affect order") { + val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) + val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc) + val optimized = Optimize.execute(analyzer.execute(filteredAndReordered)) + val correctAnswer = analyzer.execute(orderedPlan.limit(Literal(10))) + comparePlans(optimized, correctAnswer) + } + + test("range is already sorted") { + val inputPlan = Range(1L, 1000L, 1, 10) + val orderedPlan = inputPlan.orderBy('id.desc) + val optimized = Optimize.execute(analyzer.execute(orderedPlan)) + val correctAnswer = analyzer.execute(inputPlan) + comparePlans(optimized, correctAnswer) + } + + test("sort should not be removed when there is a node which doesn't guarantee any order") { + val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc) + val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc) + val optimized = Optimize.execute(analyzer.execute(groupedAndResorted)) + val correctAnswer = analyzer.execute(groupedAndResorted) + comparePlans(optimized, correctAnswer) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 22e16913d4da9..84618cf4105f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -169,4 +169,6 @@ case class InMemoryRelation( override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache) + + override def sortedOrder: Seq[SortOrder] = child.outputOrdering } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index f8b26f5b28cc7..a158a42b3df9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -22,10 +22,11 @@ import org.apache.spark.sql.{execution, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, + ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -197,6 +198,19 @@ class PlannerSuite extends SharedSQLContext { assert(planned.child.isInstanceOf[CollectLimitExec]) } + test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") { + val query = testData.select('key, 'value).sort('key.desc).cache() + assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation]) + val resorted = query.sort('key.desc) + assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty) + assert(resorted.select('key).collect().map(_.getInt(0)).toSeq == + (1 to 100).sorted(Ordering[Int].reverse)) + // with a different order, the sort is needed + val sortedAsc = query.sort('key) + assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort => s}.nonEmpty) + assert(sortedAsc.select('key).collect().map(_.getInt(0)).toSeq == (1 to 100)) + } + test("PartitioningCollection") { withTempView("normal", "small", "tiny") { testData.createOrReplaceTempView("normal") From 1c33263c275746b25727d04e3a7ada14140b0b68 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 2 Apr 2018 11:37:02 +0200 Subject: [PATCH 03/11] rename to outputOrdering --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 6 +++--- .../catalyst/plans/logical/basicLogicalOperators.scala | 8 ++++---- .../org/apache/spark/sql/execution/ExistingRDD.scala | 2 +- .../spark/sql/execution/columnar/InMemoryRelation.scala | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 93097852528c6..059b9bbf57dc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -740,8 +740,8 @@ object EliminateSorts extends Rule[LogicalPlan] { */ object RemoveRedundantSorts extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case Sort(orders, true, child) if child.sortedOrder.nonEmpty - && child.sortedOrder.zip(orders).forall { case (s1, s2) => s1.satisfies(s2) } => + case Sort(orders, true, child) if child.outputOrdering.nonEmpty + && SortOrder.orderingSatisfies(child.outputOrdering, orders) => child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 99b3fe6588f2e..dbb09c62a2ab3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -221,9 +221,9 @@ abstract class LogicalPlan def refresh(): Unit = children.foreach(_.refresh()) /** - * If the current plan contains sorted data, it contains the sorted order. + * Returns the output ordering that this plan generates. */ - def sortedOrder: Seq[SortOrder] = Nil + def outputOrdering: Seq[SortOrder] = Nil } /** @@ -281,5 +281,5 @@ abstract class BinaryNode extends LogicalPlan { } abstract class KeepOrderUnaryNode extends UnaryNode { - override final def sortedOrder: Seq[SortOrder] = child.sortedOrder + override final def outputOrdering: Seq[SortOrder] = child.outputOrdering } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 127398d7e7389..1e91ad2d93c35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -470,7 +470,7 @@ case class Sort( child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def maxRows: Option[Long] = child.maxRows - override def sortedOrder: Seq[SortOrder] = order + override def outputOrdering: Seq[SortOrder] = order } /** Factory for constructing new `Range` nodes. */ @@ -525,7 +525,7 @@ case class Range( Statistics(sizeInBytes = LongType.defaultSize * numElements) } - override def sortedOrder: Seq[SortOrder] = output.map(a => SortOrder(a, Descending)) + override def outputOrdering: Seq[SortOrder] = output.map(a => SortOrder(a, Descending)) } case class Aggregate( @@ -872,8 +872,8 @@ case class RepartitionByExpression( override def maxRows: Option[Long] = child.maxRows override def shuffle: Boolean = true - override def sortedOrder: Seq[SortOrder] = partitioning match { - case RangePartitioning(sortedOrder, _) => sortedOrder + override def outputOrdering: Seq[SortOrder] = partitioning match { + case RangePartitioning(ordering, _) => ordering case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index f3555508185fe..be50a1571a2ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -125,7 +125,7 @@ case class LogicalRDD( output: Seq[Attribute], rdd: RDD[InternalRow], outputPartitioning: Partitioning = UnknownPartitioning(0), - outputOrdering: Seq[SortOrder] = Nil, + override val outputOrdering: Seq[SortOrder] = Nil, override val isStreaming: Boolean = false)(session: SparkSession) extends LeafNode with MultiInstanceRelation { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 84618cf4105f1..3d27f7f423049 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -170,5 +170,5 @@ case class InMemoryRelation( override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache) - override def sortedOrder: Seq[SortOrder] = child.outputOrdering + override def outputOrdering: Seq[SortOrder] = child.outputOrdering } From 60ea6fcee5777d79eb3f9b6e86db1a5516995e7a Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 4 Apr 2018 14:25:05 +0200 Subject: [PATCH 04/11] remove outputOrdering for RangePartitioning --- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 1e91ad2d93c35..4b2b1b7c2f9c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -871,11 +871,6 @@ case class RepartitionByExpression( override def maxRows: Option[Long] = child.maxRows override def shuffle: Boolean = true - - override def outputOrdering: Seq[SortOrder] = partitioning match { - case RangePartitioning(ordering, _) => ordering - case _ => Nil - } } /** From 1c7cae685314bf762b38defb9233dbef315ab0df Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 10 Apr 2018 14:17:26 +0200 Subject: [PATCH 05/11] fix Range ordering + add more Range UT + address comments --- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 9 ++++- .../optimizer/RemoveRedundantSortsSuite.scala | 40 +++++++++++-------- .../spark/sql/execution/PlannerSuite.scala | 4 +- 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 059b9bbf57dc1..82a0ca6a5795f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -736,7 +736,7 @@ object EliminateSorts extends Rule[LogicalPlan] { } /** - * Removes Sort operations on already sorted data + * Removes Sort operation if the child is already sorted */ object RemoveRedundantSorts extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4b2b1b7c2f9c4..ff1409a1fe450 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -525,7 +525,14 @@ case class Range( Statistics(sizeInBytes = LongType.defaultSize * numElements) } - override def outputOrdering: Seq[SortOrder] = output.map(a => SortOrder(a, Descending)) + override def outputOrdering: Seq[SortOrder] = { + val order = if (step > 0) { + Ascending + } else { + Descending + } + output.map(a => SortOrder(a, order)) + } } case class Aggregate( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala index 9bc53b756ae01..2319ab8046e56 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala @@ -29,9 +29,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, ORDER_BY_ORDINAL} class RemoveRedundantSortsSuite extends PlanTest { - override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, ORDER_BY_ORDINAL -> false) - val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) - val analyzer = new Analyzer(catalog, conf) object Optimize extends RuleExecutor[LogicalPlan] { val batches = @@ -46,48 +43,59 @@ class RemoveRedundantSortsSuite extends PlanTest { test("remove redundant order by") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst) - val optimized = Optimize.execute(analyzer.execute(unnecessaryReordered)) - val correctAnswer = analyzer.execute(orderedPlan.select('a)) + val optimized = Optimize.execute(unnecessaryReordered.analyze) + val correctAnswer = orderedPlan.select('a).analyze comparePlans(Optimize.execute(optimized), correctAnswer) } test("do not remove sort if the order is different") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) val reorderedDifferently = orderedPlan.select('a).orderBy('a.asc, 'b.desc) - val optimized = Optimize.execute(analyzer.execute(reorderedDifferently)) - val correctAnswer = analyzer.execute(reorderedDifferently) + val optimized = Optimize.execute(reorderedDifferently.analyze) + val correctAnswer = reorderedDifferently.analyze comparePlans(optimized, correctAnswer) } test("filters don't affect order") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc) - val optimized = Optimize.execute(analyzer.execute(filteredAndReordered)) - val correctAnswer = analyzer.execute(orderedPlan.where('a > Literal(10))) + val optimized = Optimize.execute(filteredAndReordered.analyze) + val correctAnswer = orderedPlan.where('a > Literal(10)).analyze comparePlans(optimized, correctAnswer) } test("limits don't affect order") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc) - val optimized = Optimize.execute(analyzer.execute(filteredAndReordered)) - val correctAnswer = analyzer.execute(orderedPlan.limit(Literal(10))) + val optimized = Optimize.execute(filteredAndReordered.analyze) + val correctAnswer = orderedPlan.limit(Literal(10)).analyze comparePlans(optimized, correctAnswer) } test("range is already sorted") { val inputPlan = Range(1L, 1000L, 1, 10) - val orderedPlan = inputPlan.orderBy('id.desc) - val optimized = Optimize.execute(analyzer.execute(orderedPlan)) - val correctAnswer = analyzer.execute(inputPlan) + val orderedPlan = inputPlan.orderBy('id.asc) + val optimized = Optimize.execute(orderedPlan.analyze) + val correctAnswer = inputPlan.analyze comparePlans(optimized, correctAnswer) + + val reversedPlan = inputPlan.orderBy('id.desc) + val reversedOptimized = Optimize.execute(reversedPlan.analyze) + val reversedCorrectAnswer = reversedPlan.analyze + comparePlans(reversedOptimized, reversedCorrectAnswer) + + val negativeStepInputPlan = Range(10L, 1L, -1, 10) + val negativeStepOrderedPlan = negativeStepInputPlan.orderBy('id.desc) + val negativeStepOptimized = Optimize.execute(negativeStepOrderedPlan.analyze) + val negativeStepCorrectAnswer = negativeStepInputPlan.analyze + comparePlans(negativeStepOptimized, negativeStepCorrectAnswer) } test("sort should not be removed when there is a node which doesn't guarantee any order") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc) val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc) - val optimized = Optimize.execute(analyzer.execute(groupedAndResorted)) - val correctAnswer = analyzer.execute(groupedAndResorted) + val optimized = Optimize.execute(groupedAndResorted.analyze) + val correctAnswer = groupedAndResorted.analyze comparePlans(optimized, correctAnswer) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index a158a42b3df9c..4c454917bdfab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -204,10 +204,10 @@ class PlannerSuite extends SharedSQLContext { val resorted = query.sort('key.desc) assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty) assert(resorted.select('key).collect().map(_.getInt(0)).toSeq == - (1 to 100).sorted(Ordering[Int].reverse)) + (1 to 100).reverse) // with a different order, the sort is needed val sortedAsc = query.sort('key) - assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort => s}.nonEmpty) + assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort => s}.size == 1) assert(sortedAsc.select('key).collect().map(_.getInt(0)).toSeq == (1 to 100)) } From e376c193b44d5293cf9e7075b83149c93d1a9342 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 10 Apr 2018 17:32:41 +0200 Subject: [PATCH 06/11] fix ut failure --- .../test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala index cee85ec8af04d..949505e449fd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala @@ -39,7 +39,7 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 10000 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id) + val data = spark.range(0, n, 1, 1).sort('id.desc) .selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect() // Compute histogram for the number of records per partition post sort From 930ef7b6b02352f3447f00310923e1cd3549b537 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 11 Apr 2018 15:04:05 +0200 Subject: [PATCH 07/11] rename to OrderPreservingUnaryNode --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 12 ++++++------ .../apache/spark/sql/execution/PlannerSuite.scala | 3 +-- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index dbb09c62a2ab3..42034403d6d03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -280,6 +280,6 @@ abstract class BinaryNode extends LogicalPlan { override final def children: Seq[LogicalPlan] = Seq(left, right) } -abstract class KeepOrderUnaryNode extends UnaryNode { +abstract class OrderPreservingUnaryNode extends UnaryNode { override final def outputOrdering: Seq[SortOrder] = child.outputOrdering } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index ff1409a1fe450..10df504795430 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -43,12 +43,12 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { * This node is inserted at the top of a subquery when it is optimized. This makes sure we can * recognize a subquery as such, and it allows us to write subquery aware transformations. */ -case class Subquery(child: LogicalPlan) extends KeepOrderUnaryNode { +case class Subquery(child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output } case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) - extends KeepOrderUnaryNode { + extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) override def maxRows: Option[Long] = child.maxRows @@ -126,7 +126,7 @@ case class Generate( } case class Filter(condition: Expression, child: LogicalPlan) - extends KeepOrderUnaryNode with PredicateHelper { + extends OrderPreservingUnaryNode with PredicateHelper { override def output: Seq[Attribute] = child.output override def maxRows: Option[Long] = child.maxRows @@ -739,7 +739,7 @@ object Limit { * * See [[Limit]] for more information. */ -case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOrderUnaryNode { +case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output override def maxRows: Option[Long] = { limitExpr match { @@ -755,7 +755,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOr * * See [[Limit]] for more information. */ -case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOrderUnaryNode { +case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output override def maxRowsPerPartition: Option[Long] = { @@ -775,7 +775,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOrd case class SubqueryAlias( alias: String, child: LogicalPlan) - extends KeepOrderUnaryNode { + extends OrderPreservingUnaryNode { override def doCanonicalize(): LogicalPlan = child.canonicalized diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 4c454917bdfab..40915a102bab0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -25,8 +25,7 @@ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, - ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf From a1846abcea98c9174b0fa06bbb2dcd5545ef6eb3 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 11 Apr 2018 15:33:22 +0200 Subject: [PATCH 08/11] fix InMemoryRelation --- .../spark/sql/execution/CacheManager.scala | 4 ++-- .../execution/columnar/InMemoryRelation.scala | 16 ++++++++-------- .../columnar/InMemoryColumnarQuerySuite.scala | 10 +++++----- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index d68aeb275afda..a8794be7280c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -99,7 +99,7 @@ class CacheManager extends Logging { sparkSession.sessionState.conf.columnBatchSize, storageLevel, sparkSession.sessionState.executePlan(planToCache).executedPlan, tableName, - planToCache.stats) + planToCache) cachedData.add(CachedData(planToCache, inMemoryRelation)) } } @@ -148,7 +148,7 @@ class CacheManager extends Logging { storageLevel = cd.cachedRepresentation.storageLevel, child = spark.sessionState.executePlan(cd.plan).executedPlan, tableName = cd.cachedRepresentation.tableName, - statsOfPlanToCache = cd.plan.stats) + logicalPlan = cd.plan) needToRecache += cd.copy(cachedRepresentation = newCache) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 3d27f7f423049..11ac1838aebc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan, Statistics} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.storage.StorageLevel import org.apache.spark.util.LongAccumulator @@ -38,9 +38,9 @@ object InMemoryRelation { storageLevel: StorageLevel, child: SparkPlan, tableName: Option[String], - statsOfPlanToCache: Statistics): InMemoryRelation = + logicalPlan: LogicalPlan): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)( - statsOfPlanToCache = statsOfPlanToCache) + statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering) } @@ -63,7 +63,8 @@ case class InMemoryRelation( tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, - statsOfPlanToCache: Statistics) + statsOfPlanToCache: Statistics, + override val outputOrdering: Seq[SortOrder]) extends logical.LeafNode with MultiInstanceRelation { override protected def innerChildren: Seq[SparkPlan] = Seq(child) @@ -149,7 +150,7 @@ case class InMemoryRelation( def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { InMemoryRelation( newOutput, useCompression, batchSize, storageLevel, child, tableName)( - _cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache) + _cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache, outputOrdering) } override def newInstance(): this.type = { @@ -162,13 +163,12 @@ case class InMemoryRelation( tableName)( _cachedColumnBuffers, sizeInBytesStats, - statsOfPlanToCache).asInstanceOf[this.type] + statsOfPlanToCache, + outputOrdering).asInstanceOf[this.type] } def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache) - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index dc1766fb9a785..8c1494c98e13f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -42,7 +42,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val storageLevel = MEMORY_ONLY val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None, - data.logicalPlan.stats) + data.logicalPlan) assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == storageLevel) inMemoryRelation.cachedColumnBuffers.collect().head match { @@ -119,7 +119,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("simple columnar query") { val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, - testData.logicalPlan.stats) + testData.logicalPlan) checkAnswer(scan, testData.collect().toSeq) } @@ -138,7 +138,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val logicalPlan = testData.select('value, 'key).logicalPlan val plan = spark.sessionState.executePlan(logicalPlan).sparkPlan val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, - logicalPlan.stats) + logicalPlan) checkAnswer(scan, testData.collect().map { case Row(key: Int, value: String) => value -> key @@ -155,7 +155,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, - testData.logicalPlan.stats) + testData.logicalPlan) checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq) @@ -329,7 +329,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-17549: cached table size should be correctly calculated") { val data = spark.sparkContext.parallelize(1 to 10, 5).toDF() val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan - val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, data.logicalPlan.stats) + val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, data.logicalPlan) // Materialize the data. val expectedAnswer = data.collect() From 4e441f81af2ed43353224e5a82487a8889d37619 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 11 Apr 2018 16:22:11 +0200 Subject: [PATCH 09/11] fix build failure --- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index da6ef4a33274e..a7ba9b86a176f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -77,7 +77,8 @@ case class InMemoryRelation( tableName = None)( _cachedColumnBuffers, sizeInBytesStats, - statsOfPlanToCache) + statsOfPlanToCache, + outputOrdering) override def producedAttributes: AttributeSet = outputSet From 6e95e37d6475cb2ede1e0a8e128d3ee96c4d3273 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 12 Apr 2018 10:13:47 +0200 Subject: [PATCH 10/11] fix UT --- .../sql/execution/columnar/InMemoryColumnarQuerySuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index ec96f163aa4ee..9b7b316211d30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -22,6 +22,7 @@ import java.sql.{Date, Timestamp} import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In} +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec, WholeStageCodegenExec} import org.apache.spark.sql.functions._ @@ -455,7 +456,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-22249: buildFilter should not throw exception when In contains an empty list") { val attribute = AttributeReference("a", IntegerType)() val localTableScanExec = LocalTableScanExec(Seq(attribute), Nil) - val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, localTableScanExec, None, null) + val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, localTableScanExec, None, + LocalRelation(Seq(attribute), Nil)) val tableScanExec = InMemoryTableScanExec(Seq(attribute), Seq(In(attribute, Nil)), testRelation) assert(tableScanExec.partitionFilters.isEmpty) From 6c5f04cb989736ced5d7c8695a0740e512df36c6 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 13 Apr 2018 11:50:58 +0200 Subject: [PATCH 11/11] address comment --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 42c753ee2ef19..5fb59ef350b8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -740,8 +740,7 @@ object EliminateSorts extends Rule[LogicalPlan] { */ object RemoveRedundantSorts extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case Sort(orders, true, child) if child.outputOrdering.nonEmpty - && SortOrder.orderingSatisfies(child.outputOrdering, orders) => + case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) => child } }