Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class Analyzer(
ExtractWindowExpressions ::
GlobalAggregates ::
UnresolvedHavingClauseAttributes ::
RemoveEvaluationFromSort ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This even introduce complicity.

I'm wondering what's the reason we should do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The origin motivation is adding a project to materialize nondeterministic expressions in ORDER BY to avoid extra evaluation that lead to wrong answer, see JIRA. In an offline discussion we decided to apply this rule for all still-need-evaluate expressions. But now I think it maybe overkill. @rxin What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most optimal way is we have a perfect cost model that can predict what we are trading off (network vs cpu). Minus that, I think just always projecting is the approach that makes more sense in most common cases, because:

  1. It is hard to quantify the difference.
  2. I/O (network, disk) is rarely the bottleneck here, especially with more SSDs and 10Gbps network.
  3. Most of the time order by is just ordering by a field, and this won't hurt that case.
  4. If there is a complex expression, doing the eval many times during sorting is bad.

The alternative, which is probably even better, is for the sorter itself to always project out the sort key. It might make more sense there, but is slightly more complicated to write I think.

HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
Expand Down Expand Up @@ -947,6 +948,63 @@ class Analyzer(
Project(p.output, newPlan.withNewChildren(newChild :: Nil))
}
}

/**
* Removes all still-need-evaluate ordering expressions from sort and use an inner project to
* materialize them, finally use a outer project to project them away to keep the result same.
* Then we can make sure we only sort by [[AttributeReference]]s.
*
* As an example,
* {{{
* Sort('a, 'b + 1,
* Relation('a, 'b))
* }}}
* will be turned into:
* {{{
* Project('a, 'b,
* Sort('a, '_sortCondition,
* Project('a, 'b, ('b + 1).as("_sortCondition"),
* Relation('a, 'b))))
* }}}
*/
object RemoveEvaluationFromSort extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an optimization not something to be done in analysis. We should always ask "Can the query be executed correctly without this rule?" to decide if it belongs in the Analyzer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, after talking to @rxin I see he does plan to make this a requirement for execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And you even made it a condition of resolved :) !

private def hasAlias(expr: Expression) = {
expr.find {
case a: Alias => true
case _ => false
}.isDefined
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// The ordering expressions have no effect to the output schema of `Sort`,
// so `Alias`s in ordering expressions are unnecessary and we should remove them.
case s @ Sort(ordering, _, _) if ordering.exists(hasAlias) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is probably more expensive than just doing the transformation always. If its a noop we will detect that through reference equality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also add a test to make sure we don't project unnecessarily when there is an alias?

val newOrdering = ordering.map(_.transformUp {
case Alias(child, _) => child
}.asInstanceOf[SortOrder])
s.copy(order = newOrdering)

case s @ Sort(ordering, global, child)
if s.expressions.forall(_.resolved) && s.childrenResolved && !s.hasNoEvaluation =>

val (ref, needEval) = ordering.partition(_.child.isInstanceOf[AttributeReference])

val namedExpr = needEval.map(_.child match {
case n: NamedExpression => n
case e => Alias(e, "_sortCondition")()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we will have multiple conditions needed to alias?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's definitely possible, but the alias name here doesn't matter, we'll call toAttribute later, and thus bind it with expression id.

})

val newOrdering = ref ++ needEval.zip(namedExpr).map { case (order, ne) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug here. We need to consider the original ordering of the expressions or we might sort differently. Consider:

Seq((1,2,3,4)).toDF("a", "b", "c", "d").registerTempTable("test")

scala> sqlContext.sql("SELECT * FROM test ORDER BY a + 1, b").queryExecution
== Parsed Logical Plan ==
'Sort [('a + 1) ASC,'b ASC], true
 'Project [unresolvedalias(*)]
  'UnresolvedRelation [test], None

== Optimized Logical Plan ==
Project [a#4,b#5,c#6,d#7]
 Sort [b#5 ASC,_sortCondition#9 ASC], true
  LocalRelation [a#4,b#5,c#6,d#7,_sortCondition#9], [[1,2,3,4,2]]

Notice how we are now sorting by b before a + 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created https://issues.apache.org/jira/browse/SPARK-9512 to bug fix work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I was terribly busy these days... thanks for the JIRA! I'll work on it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

o, it ha not been fixed yet. Will fix it before 1.5 release.

_____________________________

From: Wenchen Fan [email protected]
Sent: Friday, July 31, 2015 4:08 PM
Subject: Re: [spark] [SPARK-9251][SQL] do not order by expressions which still need evaluation (#7593)
To: apache/spark [email protected]
Cc: Yin Huai [email protected]

In sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: > + val newOrdering = ordering.map(.transformUp {> + case Alias(child, ) => child> + }.asInstanceOf[SortOrder])> + s.copy(order = newOrdering)> +> + case s @ Sort(ordering, global, child)> + if s.expressions.forall(.resolved) && s.childrenResolved && !s.hasNoEvaluation =>> +> + val (ref, needEval) = ordering.partition(.child.isInstanceOf[AttributeReference])> +> + val namedExpr = needEval.map(_.child match {> + case n: NamedExpression => n> + case e => Alias(e, "_sortCondition")()> + })> +> + val newOrdering = ref ++ needEval.zip(namedExpr).map { case (order, ne) =>

Sorry I was terribly busy these days... thanks for the fix!


Reply to this email directly or view it on GitHub.

order.copy(child = ne.toAttribute)
}

// Add still-need-evaluate ordering expressions into inner project and then project
// them away after the sort.
Project(child.output,
Sort(newOrdering, global,
Project(child.output ++ namedExpr, child)))
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class Rand(seed: Long) extends RDG {
val rngTerm = ctx.freshName("rng")
val className = classOf[XORShiftRandom].getName
ctx.addMutableState(className, rngTerm,
s"$rngTerm = new $className($seed + org.apache.spark.TaskContext.getPartitionId());")
s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an existing and small bug, sometimes seed is large and can not be represented as int literal, add a L at end to make it a long literal.

ev.isNull = "false"
s"""
final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextDouble();
Expand All @@ -89,7 +89,7 @@ case class Randn(seed: Long) extends RDG {
val rngTerm = ctx.freshName("rng")
val className = classOf[XORShiftRandom].getName
ctx.addMutableState(className, rngTerm,
s"$rngTerm = new $className($seed + org.apache.spark.TaskContext.getPartitionId());")
s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());")
ev.isNull = "false"
s"""
final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextGaussian();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
}.nonEmpty
)

!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
expressions.forall(_.resolved) && childrenResolved && !hasSpecialExpressions
}
}

Expand Down Expand Up @@ -67,7 +67,7 @@ case class Generate(
generator.resolved &&
childrenResolved &&
generator.elementTypes.length == generatorOutput.length &&
!generatorOutput.exists(!_.resolved)
generatorOutput.forall(_.resolved)
}

// we don't want the gOutput to be taken as part of the expressions
Expand Down Expand Up @@ -187,7 +187,7 @@ case class WithWindowDefinition(
}

/**
* @param order The ordering expressions
* @param order The ordering expressions, should all be [[AttributeReference]]
* @param global True means global sorting apply for entire data set,
* False means sorting only apply within the partition.
* @param child Child logical plan
Expand All @@ -197,6 +197,11 @@ case class Sort(
global: Boolean,
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

def hasNoEvaluation: Boolean = order.forall(_.child.isInstanceOf[AttributeReference])

override lazy val resolved: Boolean =
expressions.forall(_.resolved) && childrenResolved && hasNoEvaluation
}

case class Aggregate(
Expand All @@ -211,7 +216,7 @@ case class Aggregate(
}.nonEmpty
)

!expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions
expressions.forall(_.resolved) && childrenResolved && !hasWindowExpressions
}

override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,39 @@ class AnalysisSuite extends AnalysisTest {

test("pull out nondeterministic expressions from Sort") {
val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation)
val projected = Alias(Rand(33), "_nondeterministic")()
val analyzed = caseSensitiveAnalyzer.execute(plan)
analyzed.transform {
case s: Sort if s.expressions.exists(!_.deterministic) =>
fail("nondeterministic expressions are not allowed in Sort")
}
}

test("remove still-need-evaluate ordering expressions from sort") {
val a = testRelation2.output(0)
val b = testRelation2.output(1)

def makeOrder(e: Expression): SortOrder = SortOrder(e, Ascending)

val noEvalOrdering = makeOrder(a)
val noEvalOrderingWithAlias = makeOrder(Alias(Alias(b, "name1")(), "name2")())

val needEvalExpr = Coalesce(Seq(a, Literal("1")))
val needEvalExpr2 = Coalesce(Seq(a, b))
val needEvalOrdering = makeOrder(needEvalExpr)
val needEvalOrdering2 = makeOrder(needEvalExpr2)

val plan = Sort(
Seq(noEvalOrdering, noEvalOrderingWithAlias, needEvalOrdering, needEvalOrdering2),
false, testRelation2)

val evaluatedOrdering = makeOrder(AttributeReference("_sortCondition", StringType)())
val materializedExprs = Seq(needEvalExpr, needEvalExpr2).map(e => Alias(e, "_sortCondition")())

val expected =
Project(testRelation.output,
Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false,
Project(testRelation.output :+ projected, testRelation)))
Project(testRelation2.output,
Sort(Seq(makeOrder(a), makeOrder(b), evaluatedOrdering, evaluatedOrdering), false,
Project(testRelation2.output ++ materializedExprs, testRelation2)))

checkAnalysis(plan, expected)
}
}
2 changes: 0 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql

import java.sql.Timestamp

import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.test._

Expand Down