Skip to content

Commit 708794e

Browse files
cloud-fanrxin
authored andcommitted
[SPARK-9251][SQL] do not order by expressions which still need evaluation
as an offline discussion with rxin , it's weird to be computing stuff while doing sorting, we should only order by bound reference during execution. Author: Wenchen Fan <[email protected]> Closes apache#7593 from cloud-fan/sort and squashes the following commits: 7b1bef7 [Wenchen Fan] add test daf206d [Wenchen Fan] add more comments 289bee0 [Wenchen Fan] do not order by expressions which still need evaluation
1 parent 15667a0 commit 708794e

File tree

5 files changed

+101
-12
lines changed

5 files changed

+101
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class Analyzer(
7979
ExtractWindowExpressions ::
8080
GlobalAggregates ::
8181
UnresolvedHavingClauseAttributes ::
82+
RemoveEvaluationFromSort ::
8283
HiveTypeCoercion.typeCoercionRules ++
8384
extendedResolutionRules : _*),
8485
Batch("Nondeterministic", Once,
@@ -947,6 +948,63 @@ class Analyzer(
947948
Project(p.output, newPlan.withNewChildren(newChild :: Nil))
948949
}
949950
}
951+
952+
/**
953+
* Removes all still-need-evaluate ordering expressions from sort and use an inner project to
954+
* materialize them, finally use a outer project to project them away to keep the result same.
955+
* Then we can make sure we only sort by [[AttributeReference]]s.
956+
*
957+
* As an example,
958+
* {{{
959+
* Sort('a, 'b + 1,
960+
* Relation('a, 'b))
961+
* }}}
962+
* will be turned into:
963+
* {{{
964+
* Project('a, 'b,
965+
* Sort('a, '_sortCondition,
966+
* Project('a, 'b, ('b + 1).as("_sortCondition"),
967+
* Relation('a, 'b))))
968+
* }}}
969+
*/
970+
object RemoveEvaluationFromSort extends Rule[LogicalPlan] {
971+
private def hasAlias(expr: Expression) = {
972+
expr.find {
973+
case a: Alias => true
974+
case _ => false
975+
}.isDefined
976+
}
977+
978+
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
979+
// The ordering expressions have no effect to the output schema of `Sort`,
980+
// so `Alias`s in ordering expressions are unnecessary and we should remove them.
981+
case s @ Sort(ordering, _, _) if ordering.exists(hasAlias) =>
982+
val newOrdering = ordering.map(_.transformUp {
983+
case Alias(child, _) => child
984+
}.asInstanceOf[SortOrder])
985+
s.copy(order = newOrdering)
986+
987+
case s @ Sort(ordering, global, child)
988+
if s.expressions.forall(_.resolved) && s.childrenResolved && !s.hasNoEvaluation =>
989+
990+
val (ref, needEval) = ordering.partition(_.child.isInstanceOf[AttributeReference])
991+
992+
val namedExpr = needEval.map(_.child match {
993+
case n: NamedExpression => n
994+
case e => Alias(e, "_sortCondition")()
995+
})
996+
997+
val newOrdering = ref ++ needEval.zip(namedExpr).map { case (order, ne) =>
998+
order.copy(child = ne.toAttribute)
999+
}
1000+
1001+
// Add still-need-evaluate ordering expressions into inner project and then project
1002+
// them away after the sort.
1003+
Project(child.output,
1004+
Sort(newOrdering, global,
1005+
Project(child.output ++ namedExpr, child)))
1006+
}
1007+
}
9501008
}
9511009

9521010
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ case class Rand(seed: Long) extends RDG {
6666
val rngTerm = ctx.freshName("rng")
6767
val className = classOf[XORShiftRandom].getName
6868
ctx.addMutableState(className, rngTerm,
69-
s"$rngTerm = new $className($seed + org.apache.spark.TaskContext.getPartitionId());")
69+
s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());")
7070
ev.isNull = "false"
7171
s"""
7272
final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextDouble();
@@ -89,7 +89,7 @@ case class Randn(seed: Long) extends RDG {
8989
val rngTerm = ctx.freshName("rng")
9090
val className = classOf[XORShiftRandom].getName
9191
ctx.addMutableState(className, rngTerm,
92-
s"$rngTerm = new $className($seed + org.apache.spark.TaskContext.getPartitionId());")
92+
s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());")
9393
ev.isNull = "false"
9494
s"""
9595
final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextGaussian();

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
3333
}.nonEmpty
3434
)
3535

36-
!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
36+
expressions.forall(_.resolved) && childrenResolved && !hasSpecialExpressions
3737
}
3838
}
3939

@@ -67,7 +67,7 @@ case class Generate(
6767
generator.resolved &&
6868
childrenResolved &&
6969
generator.elementTypes.length == generatorOutput.length &&
70-
!generatorOutput.exists(!_.resolved)
70+
generatorOutput.forall(_.resolved)
7171
}
7272

7373
// we don't want the gOutput to be taken as part of the expressions
@@ -187,7 +187,7 @@ case class WithWindowDefinition(
187187
}
188188

189189
/**
190-
* @param order The ordering expressions
190+
* @param order The ordering expressions, should all be [[AttributeReference]]
191191
* @param global True means global sorting apply for entire data set,
192192
* False means sorting only apply within the partition.
193193
* @param child Child logical plan
@@ -197,6 +197,11 @@ case class Sort(
197197
global: Boolean,
198198
child: LogicalPlan) extends UnaryNode {
199199
override def output: Seq[Attribute] = child.output
200+
201+
def hasNoEvaluation: Boolean = order.forall(_.child.isInstanceOf[AttributeReference])
202+
203+
override lazy val resolved: Boolean =
204+
expressions.forall(_.resolved) && childrenResolved && hasNoEvaluation
200205
}
201206

202207
case class Aggregate(
@@ -211,7 +216,7 @@ case class Aggregate(
211216
}.nonEmpty
212217
)
213218

214-
!expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions
219+
expressions.forall(_.resolved) && childrenResolved && !hasWindowExpressions
215220
}
216221

217222
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,39 @@ class AnalysisSuite extends AnalysisTest {
165165

166166
test("pull out nondeterministic expressions from Sort") {
167167
val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation)
168-
val projected = Alias(Rand(33), "_nondeterministic")()
168+
val analyzed = caseSensitiveAnalyzer.execute(plan)
169+
analyzed.transform {
170+
case s: Sort if s.expressions.exists(!_.deterministic) =>
171+
fail("nondeterministic expressions are not allowed in Sort")
172+
}
173+
}
174+
175+
test("remove still-need-evaluate ordering expressions from sort") {
176+
val a = testRelation2.output(0)
177+
val b = testRelation2.output(1)
178+
179+
def makeOrder(e: Expression): SortOrder = SortOrder(e, Ascending)
180+
181+
val noEvalOrdering = makeOrder(a)
182+
val noEvalOrderingWithAlias = makeOrder(Alias(Alias(b, "name1")(), "name2")())
183+
184+
val needEvalExpr = Coalesce(Seq(a, Literal("1")))
185+
val needEvalExpr2 = Coalesce(Seq(a, b))
186+
val needEvalOrdering = makeOrder(needEvalExpr)
187+
val needEvalOrdering2 = makeOrder(needEvalExpr2)
188+
189+
val plan = Sort(
190+
Seq(noEvalOrdering, noEvalOrderingWithAlias, needEvalOrdering, needEvalOrdering2),
191+
false, testRelation2)
192+
193+
val evaluatedOrdering = makeOrder(AttributeReference("_sortCondition", StringType)())
194+
val materializedExprs = Seq(needEvalExpr, needEvalExpr2).map(e => Alias(e, "_sortCondition")())
195+
169196
val expected =
170-
Project(testRelation.output,
171-
Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false,
172-
Project(testRelation.output :+ projected, testRelation)))
197+
Project(testRelation2.output,
198+
Sort(Seq(makeOrder(a), makeOrder(b), evaluatedOrdering, evaluatedOrdering), false,
199+
Project(testRelation2.output ++ materializedExprs, testRelation2)))
200+
173201
checkAnalysis(plan, expected)
174202
}
175203
}

sql/core/src/test/scala/org/apache/spark/sql/TestData.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql
1919

20-
import java.sql.Timestamp
21-
2220
import org.apache.spark.sql.test.TestSQLContext.implicits._
2321
import org.apache.spark.sql.test._
2422

0 commit comments

Comments
 (0)