Skip to content

Commit 7db5339

Browse files
Eric Liangmarmbrus
authored andcommitted
[SPARK-3349][SQL] Output partitioning of limit should not be inherited from child
This resolves https://issues.apache.org/jira/browse/SPARK-3349 Author: Eric Liang <[email protected]> Closes apache#2262 from ericl/spark-3349 and squashes the following commits: 3e1b05c [Eric Liang] add regression test ac32723 [Eric Liang] make limit/takeOrdered output SinglePartition
1 parent 08ce188 commit 7db5339

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
2727
import org.apache.spark.sql.catalyst.ScalaReflection
2828
import org.apache.spark.sql.catalyst.errors._
2929
import org.apache.spark.sql.catalyst.expressions._
30-
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
30+
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution}
3131
import org.apache.spark.util.MutablePair
3232

3333
/**
@@ -100,6 +100,7 @@ case class Limit(limit: Int, child: SparkPlan)
100100
private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
101101

102102
override def output = child.output
103+
override def outputPartitioning = SinglePartition
103104

104105
/**
105106
* A custom implementation modeled after the take function on RDDs but which never runs any job
@@ -173,6 +174,7 @@ case class Limit(limit: Int, child: SparkPlan)
173174
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode {
174175

175176
override def output = child.output
177+
override def outputPartitioning = SinglePartition
176178

177179
val ordering = new RowOrdering(sortOrder, child.output)
178180

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
359359
(null, null, 6, "F") :: Nil)
360360
}
361361

362+
test("SPARK-3349 partitioning after limit") {
363+
sql("SELECT DISTINCT n FROM lowerCaseData ORDER BY n DESC")
364+
.limit(2)
365+
.registerTempTable("subset1")
366+
sql("SELECT DISTINCT n FROM lowerCaseData")
367+
.limit(2)
368+
.registerTempTable("subset2")
369+
checkAnswer(
370+
sql("SELECT * FROM lowerCaseData INNER JOIN subset1 ON subset1.n = lowerCaseData.n"),
371+
(3, "c", 3) ::
372+
(4, "d", 4) :: Nil)
373+
checkAnswer(
374+
sql("SELECT * FROM lowerCaseData INNER JOIN subset2 ON subset2.n = lowerCaseData.n"),
375+
(1, "a", 1) ::
376+
(2, "b", 2) :: Nil)
377+
}
378+
362379
test("mixed-case keywords") {
363380
checkAnswer(
364381
sql(

0 commit comments

Comments
 (0)