Skip to content

Commit 53f0a00

Browse files
chenghao-intelmarmbrus
authored andcommitted
[Spark-4512] [SQL] Unresolved Attribute Exception in Sort By
It will cause exception while do query like: SELECT key+key FROM src sort by value; Author: Cheng Hao <[email protected]> Closes #3386 from chenghao-intel/sort and squashes the following commits: 38c78cc [Cheng Hao] revert the SortPartition in SparkStrategies 7e9dd15 [Cheng Hao] update the typo fcd1d64 [Cheng Hao] rebase the latest master and update the SortBy unit test
1 parent daac221 commit 53f0a00

File tree

11 files changed

+55
-31
lines changed

11 files changed

+55
-31
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ class SqlParser extends AbstractSparkSQLParser {
204204
)
205205

206206
protected lazy val sortType: Parser[LogicalPlan => LogicalPlan] =
207-
( ORDER ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, l) }
208-
| SORT ~ BY ~> ordering ^^ { case o => l: LogicalPlan => SortPartitions(o, l) }
207+
( ORDER ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, true, l) }
208+
| SORT ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, false, l) }
209209
)
210210

211211
protected lazy val ordering: Parser[Seq[SortOrder]] =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ class Analyzer(catalog: Catalog,
246246
case p: LogicalPlan if !p.childrenResolved => p
247247

248248
// If the projection list contains Stars, expand it.
249-
case p@Project(projectList, child) if containsStar(projectList) =>
249+
case p @ Project(projectList, child) if containsStar(projectList) =>
250250
Project(
251251
projectList.flatMap {
252252
case s: Star => s.expand(child.output, resolver)
@@ -310,7 +310,8 @@ class Analyzer(catalog: Catalog,
310310
*/
311311
object ResolveSortReferences extends Rule[LogicalPlan] {
312312
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
313-
case s @ Sort(ordering, p @ Project(projectList, child)) if !s.resolved && p.resolved =>
313+
case s @ Sort(ordering, global, p @ Project(projectList, child))
314+
if !s.resolved && p.resolved =>
314315
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
315316
val resolved = unresolved.flatMap(child.resolve(_, resolver))
316317
val requiredAttributes = AttributeSet(resolved.collect { case a: Attribute => a })
@@ -319,13 +320,14 @@ class Analyzer(catalog: Catalog,
319320
if (missingInProject.nonEmpty) {
320321
// Add missing attributes and then project them away after the sort.
321322
Project(projectList.map(_.toAttribute),
322-
Sort(ordering,
323+
Sort(ordering, global,
323324
Project(projectList ++ missingInProject, child)))
324325
} else {
325326
logDebug(s"Failed to find $missingInProject in ${p.output.mkString(", ")}")
326327
s // Nothing we can do here. Return original plan.
327328
}
328-
case s @ Sort(ordering, a @ Aggregate(grouping, aggs, child)) if !s.resolved && a.resolved =>
329+
case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child))
330+
if !s.resolved && a.resolved =>
329331
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
330332
// A small hack to create an object that will allow us to resolve any references that
331333
// refer to named expressions that are present in the grouping expressions.
@@ -340,8 +342,7 @@ class Analyzer(catalog: Catalog,
340342
if (missingInAggs.nonEmpty) {
341343
// Add missing grouping exprs and then project them away after the sort.
342344
Project(a.output,
343-
Sort(ordering,
344-
Aggregate(grouping, aggs ++ missingInAggs, child)))
345+
Sort(ordering, global, Aggregate(grouping, aggs ++ missingInAggs, child)))
345346
} else {
346347
s // Nothing we can do here. Return original plan.
347348
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,9 @@ package object dsl {
244244
condition: Option[Expression] = None) =
245245
Join(logicalPlan, otherPlan, joinType, condition)
246246

247-
def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, logicalPlan)
247+
def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, true, logicalPlan)
248248

249-
def sortBy(sortExprs: SortOrder*) = SortPartitions(sortExprs, logicalPlan)
249+
def sortBy(sortExprs: SortOrder*) = Sort(sortExprs, false, logicalPlan)
250250

251251
def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*) = {
252252
val aliasedExprs = aggregateExprs.map {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,16 @@ case class WriteToFile(
130130
override def output = child.output
131131
}
132132

133-
case class Sort(order: Seq[SortOrder], child: LogicalPlan) extends UnaryNode {
133+
/**
134+
* @param order The ordering expressions
135+
* @param global True means global sorting apply for entire data set,
136+
* False means sorting only apply within the partition.
137+
* @param child Child logical plan
138+
*/
139+
case class Sort(
140+
order: Seq[SortOrder],
141+
global: Boolean,
142+
child: LogicalPlan) extends UnaryNode {
134143
override def output = child.output
135144
}
136145

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ class SchemaRDD(
214214
* @group Query
215215
*/
216216
def orderBy(sortExprs: SortOrder*): SchemaRDD =
217-
new SchemaRDD(sqlContext, Sort(sortExprs, logicalPlan))
217+
new SchemaRDD(sqlContext, Sort(sortExprs, true, logicalPlan))
218218

219219
/**
220220
* Sorts the results by the given expressions within partition.
@@ -227,7 +227,7 @@ class SchemaRDD(
227227
* @group Query
228228
*/
229229
def sortBy(sortExprs: SortOrder*): SchemaRDD =
230-
new SchemaRDD(sqlContext, SortPartitions(sortExprs, logicalPlan))
230+
new SchemaRDD(sqlContext, Sort(sortExprs, false, logicalPlan))
231231

232232
@deprecated("use limit with integer argument", "1.1.0")
233233
def limit(limitExpr: Expression): SchemaRDD =
@@ -238,7 +238,6 @@ class SchemaRDD(
238238
* {{{
239239
* schemaRDD.limit(10)
240240
* }}}
241-
*
242241
* @group Query
243242
*/
244243
def limit(limitNum: Int): SchemaRDD =

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
190190

191191
object TakeOrdered extends Strategy {
192192
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
193-
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) =>
193+
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
194194
execution.TakeOrdered(limit, order, planLater(child)) :: Nil
195195
case _ => Nil
196196
}
@@ -257,15 +257,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
257257
execution.Distinct(partial = false,
258258
execution.Distinct(partial = true, planLater(child))) :: Nil
259259

260-
case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled =>
261-
execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil
262-
case logical.Sort(sortExprs, child) =>
263-
execution.Sort(sortExprs, global = true, planLater(child)):: Nil
264-
265260
case logical.SortPartitions(sortExprs, child) =>
266261
// This sort only sorts tuples within a partition. Its requiredDistribution will be
267262
// an UnspecifiedDistribution.
268263
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
264+
case logical.Sort(sortExprs, global, child) if sqlContext.externalSortEnabled =>
265+
execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
266+
case logical.Sort(sortExprs, global, child) =>
267+
execution.Sort(sortExprs, global, planLater(child)):: Nil
269268
case logical.Project(projectList, child) =>
270269
execution.Project(projectList, planLater(child)) :: Nil
271270
case logical.Filter(condition, child) =>

sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class DslQuerySuite extends QueryTest {
8888
Seq(Seq(6)))
8989
}
9090

91-
test("sorting") {
91+
test("global sorting") {
9292
checkAnswer(
9393
testData2.orderBy('a.asc, 'b.asc),
9494
Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
@@ -122,22 +122,31 @@ class DslQuerySuite extends QueryTest {
122122
mapData.collect().sortBy(_.data(1)).reverse.toSeq)
123123
}
124124

125-
test("sorting #2") {
125+
test("partition wide sorting") {
126+
// 2 partitions totally, and
127+
// Partition #1 with values:
128+
// (1, 1)
129+
// (1, 2)
130+
// (2, 1)
131+
// Partition #2 with values:
132+
// (2, 2)
133+
// (3, 1)
134+
// (3, 2)
126135
checkAnswer(
127136
testData2.sortBy('a.asc, 'b.asc),
128137
Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
129138

130139
checkAnswer(
131140
testData2.sortBy('a.asc, 'b.desc),
132-
Seq((1,2), (1,1), (2,2), (2,1), (3,2), (3,1)))
141+
Seq((1,2), (1,1), (2,1), (2,2), (3,2), (3,1)))
133142

134143
checkAnswer(
135144
testData2.sortBy('a.desc, 'b.desc),
136-
Seq((3,2), (3,1), (2,2), (2,1), (1,2), (1,1)))
145+
Seq((2,1), (1,2), (1,1), (3,2), (3,1), (2,2)))
137146

138147
checkAnswer(
139148
testData2.sortBy('a.desc, 'b.asc),
140-
Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2)))
149+
Seq((2,1), (1,1), (1,2), (3,1), (3,2), (2,2)))
141150
}
142151

143152
test("limit") {

sql/core/src/test/scala/org/apache/spark/sql/TestData.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ object TestData {
5555
TestData2(2, 1) ::
5656
TestData2(2, 2) ::
5757
TestData2(3, 1) ::
58-
TestData2(3, 2) :: Nil).toSchemaRDD
58+
TestData2(3, 2) :: Nil, 2).toSchemaRDD
5959
testData2.registerTempTable("testData2")
6060

6161
case class DecimalData(a: BigDecimal, b: BigDecimal)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -680,16 +680,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
680680
val withSort =
681681
(orderByClause, sortByClause, distributeByClause, clusterByClause) match {
682682
case (Some(totalOrdering), None, None, None) =>
683-
Sort(totalOrdering.getChildren.map(nodeToSortOrder), withHaving)
683+
Sort(totalOrdering.getChildren.map(nodeToSortOrder), true, withHaving)
684684
case (None, Some(perPartitionOrdering), None, None) =>
685-
SortPartitions(perPartitionOrdering.getChildren.map(nodeToSortOrder), withHaving)
685+
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
686686
case (None, None, Some(partitionExprs), None) =>
687687
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
688688
case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
689-
SortPartitions(perPartitionOrdering.getChildren.map(nodeToSortOrder),
689+
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
690690
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
691691
case (None, None, None, Some(clusterExprs)) =>
692-
SortPartitions(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)),
692+
Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
693693
Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
694694
case (None, None, None, None) => withHaving
695695
case _ => sys.error("Unsupported set of ordering / distribution clauses.")

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ abstract class HiveComparisonTest
132132

133133
def isSorted(plan: LogicalPlan): Boolean = plan match {
134134
case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false
135-
case PhysicalOperation(_, _, Sort(_, _)) => true
135+
case PhysicalOperation(_, _, Sort(_, true, _)) => true
136136
case _ => plan.children.iterator.exists(isSorted)
137137
}
138138

0 commit comments

Comments
 (0)