Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
fab24cf
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 1, 2016
8b2e33b
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 5, 2016
2ee1876
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 11, 2016
b9f0090
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 12, 2016
ade6f7e
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 15, 2016
9fd63d2
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 19, 2016
5199d49
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 22, 2016
404214c
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 23, 2016
c001dd9
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 25, 2016
59daa48
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 5, 2016
41d5f64
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 7, 2016
472a6e3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 10, 2016
0fba10a
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 12, 2016
95f25a6
group by ordinals
gatorsmile Mar 20, 2016
a927376
Merge remote-tracking branch 'upstream/master' into groupByOrdinal
gatorsmile Mar 20, 2016
b10d076
fix messages.
gatorsmile Mar 20, 2016
79a537a
'*' is not allowed to use in the select list when users specify ordin…
gatorsmile Mar 20, 2016
a1835e5
address comments.
gatorsmile Mar 21, 2016
cbf73b3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 21, 2016
c711347
Merge branch 'groupByOrdinal' into groupByOrdinalNew
gatorsmile Mar 21, 2016
960ead7
merge two ordinal resolution into the same one.
gatorsmile Mar 21, 2016
b61345b
address comments.
gatorsmile Mar 21, 2016
c08f561
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 22, 2016
18bab66
Merge branch 'groupByOrdinalNew' into groupByOrdinalNewNew
gatorsmile Mar 22, 2016
dacf2d8
temp fix.
gatorsmile Mar 22, 2016
b19b73c
fixed an issue in star expansion for group by
gatorsmile Mar 22, 2016
474df88
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 22, 2016
74a16be
address comments.
gatorsmile Mar 23, 2016
3d9828d
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 24, 2016
a06c4ce
Merge branch 'groupByOrdinalNewNew' into groupByOrdinalNewNewNew
gatorsmile Mar 24, 2016
6d08009
address comments.
gatorsmile Mar 25, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean

def orderByOrdinal: Boolean
def groupByOrdinal: Boolean

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determin if two
Expand All @@ -48,11 +49,16 @@ object EmptyConf extends CatalystConf {
override def orderByOrdinal: Boolean = {
throw new UnsupportedOperationException
}
override def groupByOrdinal: Boolean = {
throw new UnsupportedOperationException
}
}

/** A CatalystConf that can be used for local testing. */
case class SimpleCatalystConf(
caseSensitiveAnalysis: Boolean,
orderByOrdinal: Boolean = true)
orderByOrdinal: Boolean = true,
groupByOrdinal: Boolean = true)

extends CatalystConf {
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class Analyzer(
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveUpCast ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveSortReferences ::
ResolveGenerate ::
ResolveFunctions ::
Expand Down Expand Up @@ -385,7 +386,13 @@ class Analyzer(
p.copy(projectList = buildExpandedProjectList(p.projectList, p.child))
// If the aggregate function argument contains Stars, expand it.
case a: Aggregate if containsStar(a.aggregateExpressions) =>
a.copy(aggregateExpressions = buildExpandedProjectList(a.aggregateExpressions, a.child))
if (conf.groupByOrdinal && a.groupingExpressions.exists(IntegerIndex.unapply(_).nonEmpty)) {
failAnalysis(
"Group by position: star is not allowed to use in the select list " +
"when using ordinals in group by")
} else {
a.copy(aggregateExpressions = buildExpandedProjectList(a.aggregateExpressions, a.child))
}
// If the script transformation input contains Stars, expand it.
case t: ScriptTransformation if containsStar(t.input) =>
t.copy(
Expand Down Expand Up @@ -634,21 +641,23 @@ class Analyzer(
}
}

/**
* In many dialects of SQL it is valid to sort by attributes that are not present in the SELECT
* clause. This rule detects such queries and adds the required attributes to the original
* projection, so that they will be available during sorting. Another projection is added to
* remove these attributes after sorting.
*
* This rule also resolves the position number in sort references. This support is introduced
* in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting.
* - When the sort references are not integer but foldable expressions, ignore them.
* - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too.
*/
object ResolveSortReferences extends Rule[LogicalPlan] {
/**
* In many dialects of SQL it is valid to use ordinal positions in order/sort by and group by
* clauses. This rule is to convert ordinal positions to the corresponding expressions in the
* select list. This support is introduced in Spark 2.0.
*
* - When the sort references or group by expressions are not integer but foldable expressions,
* just ignore them.
* - When spark.sql.orderByOrdinal/spark.sql.groupByOrdinal is set to false, ignore the position
* numbers too.
*
* Before the release of Spark 2.0, the literals in order/sort by and group by clauses
* have no effect on the results.
*/
object ResolveOrdinalInOrderByAndGroupBy extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case s: Sort if !s.child.resolved => s
// Replace the index with the related attribute for ORDER BY
case p if !p.childrenResolved => p
// Replace the index with the related attribute for ORDER BY,
// which is a 1-base position of the projection list.
case s @ Sort(orders, global, child)
if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) =>
Expand All @@ -665,10 +674,41 @@ class Analyzer(
}
Sort(newOrders, global, child)

// Replace the index with the corresponding expression in aggregateExpressions. The index is
// a 1-base position of aggregateExpressions, which is output columns (select expression)
case a @ Aggregate(groups, aggs, child)
if conf.groupByOrdinal && aggs.forall(_.resolved) &&
groups.exists(IntegerIndex.unapply(_).nonEmpty) =>
val newGroups = groups.map {
case IntegerIndex(index) if index > 0 && index <= aggs.size =>
aggs(index - 1) match {
case e if ResolveAggregateFunctions.containsAggregate(e) =>
throw new UnresolvedException(a,
s"Group by position: the '$index'th column in the select contains an " +
s"aggregate function: ${e.sql}. Aggregate functions are not allowed in GROUP BY")
case o => o
}
case IntegerIndex(index) =>
throw new UnresolvedException(a,
s"Group by position: '$index' exceeds the size of the select list '${aggs.size}'.")
case o => o
}
Aggregate(newGroups, aggs, child)
}
}

/**
* In many dialects of SQL it is valid to sort by attributes that are not present in the SELECT
* clause. This rule detects such queries and adds the required attributes to the original
* projection, so that they will be available during sorting. Another projection is added to
* remove these attributes after sorting.
*/
object ResolveSortReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
case sa @ Sort(_, _, child: Aggregate) => sa

case s @ Sort(order, _, child) if !s.resolved =>
case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
try {
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ object Unions {
object IntegerIndex {
def unapply(a: Any): Option[Int] = a match {
case Literal(a: Int, IntegerType) => Some(a)
// When resolving ordinal in Sort, negative values are extracted for issuing error messages.
// When resolving ordinal in Sort and Group By, negative values are extracted
// for issuing error messages.
case UnaryMinus(IntegerLiteral(v)) => Some(-v)
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,11 @@ object SQLConf {
doc = "When true, the ordinal numbers are treated as the position in the select list. " +
"When false, the ordinal numbers in order/sort By clause are ignored.")

val GROUP_BY_ORDINAL = booleanConf("spark.sql.groupByOrdinal",
defaultValue = Some(true),
doc = "When true, the ordinal numbers in group by clauses are treated as the position " +
"in the select list. When false, the ordinal numbers are ignored.")

// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
//
Expand Down Expand Up @@ -668,6 +673,7 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin

override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)

override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
92 changes: 85 additions & 7 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.sql.Timestamp
import org.apache.spark.AccumulatorSuite
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin}
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -459,25 +460,103 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Seq(Row(1, 3), Row(2, 3), Row(3, 3)))
}

test("literal in agg grouping expressions") {
test("Group By Ordinal - basic") {
checkAnswer(
sql("SELECT a, count(1) FROM testData2 GROUP BY a, 1"),
Seq(Row(1, 2), Row(2, 2), Row(3, 2)))
checkAnswer(
sql("SELECT a, count(2) FROM testData2 GROUP BY a, 2"),
Seq(Row(1, 2), Row(2, 2), Row(3, 2)))
sql("SELECT a, sum(b) FROM testData2 GROUP BY 1"),
sql("SELECT a, sum(b) FROM testData2 GROUP BY a"))

// duplicate group-by columns
checkAnswer(
sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1"),
sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a"))

checkAnswer(
sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY 1, 2"),
sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a"))
}

test("Group By Ordinal - non aggregate expressions") {
checkAnswer(
sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, 2"),
sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2"))

checkAnswer(
sql("SELECT a, b + 2 as c, count(2) FROM testData2 GROUP BY a, 2"),
sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2"))
}

test("Group By Ordinal - non-foldable constant expression") {
checkAnswer(
sql("SELECT a, b, sum(b) FROM testData2 GROUP BY a, b, 1 + 0"),
sql("SELECT a, b, sum(b) FROM testData2 GROUP BY a, b"))

checkAnswer(
sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1 + 2"),
sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a"))
}

test("Group By Ordinal - alias") {
checkAnswer(
sql("SELECT a, (b + 2) as c, count(2) FROM testData2 GROUP BY a, 2"),
sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2"))

checkAnswer(
sql("SELECT a as b, b as a, sum(b) FROM testData2 GROUP BY 1, 2"),
sql("SELECT a, b, sum(b) FROM testData2 GROUP BY a, b"))
}

test("Group By Ordinal - constants") {
checkAnswer(
sql("SELECT 1, 2, sum(b) FROM testData2 GROUP BY 1, 2"),
sql("SELECT 1, 2, sum(b) FROM testData2"))
}

test("Group By Ordinal - negative cases") {
intercept[UnresolvedException[Aggregate]] {
sql("SELECT a, b FROM testData2 GROUP BY -1")
}

intercept[UnresolvedException[Aggregate]] {
sql("SELECT a, b FROM testData2 GROUP BY 3")
}

var e = intercept[UnresolvedException[Aggregate]](
sql("SELECT SUM(a) FROM testData2 GROUP BY 1"))
assert(e.getMessage contains
"Invalid call to Group by position: the '1'th column in the select contains " +
"an aggregate function")

e = intercept[UnresolvedException[Aggregate]](
sql("SELECT SUM(a) + 1 FROM testData2 GROUP BY 1"))
assert(e.getMessage contains
"Invalid call to Group by position: the '1'th column in the select contains " +
"an aggregate function")

var ae = intercept[AnalysisException](
sql("SELECT a, rand(0), sum(b) FROM testData2 GROUP BY a, 2"))
assert(ae.getMessage contains
"nondeterministic expression rand(0) should not appear in grouping expression")

ae = intercept[AnalysisException](
sql("SELECT * FROM testData2 GROUP BY a, b, 1"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we handle the star case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest merge removed it. : ) Let me add it back. Thanks!

BTW, the latest test build will fail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a bug in star expansion. Will fix it first. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sql("SELECT * FROM testData2 GROUP BY a, b, 1")

After the fix, it can resolve the star in the above query. Thanks!

assert(ae.getMessage contains
"Group by position: star is not allowed to use in the select list " +
"when using ordinals in group by")
}

test("Group By Ordinal: spark.sql.groupByOrdinal=false") {
withSQLConf(SQLConf.GROUP_BY_ORDINAL.key -> "false") {
// If spark.sql.groupByOrdinal=false, ignore the position number.
intercept[AnalysisException] {
sql("SELECT a, sum(b) FROM testData2 GROUP BY 1")
}
// '*' is not allowed to use in the select list when users specify ordinals in group by
checkAnswer(
sql("SELECT * FROM testData2 GROUP BY a, b, 1"),
sql("SELECT * FROM testData2 GROUP BY a, b"))
}
}

test("aggregates with nulls") {
checkAnswer(
sql("SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a)," +
Expand Down Expand Up @@ -2174,7 +2253,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"),
sql("SELECT * FROM testData2 ORDER BY b ASC"))

checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC"))
Expand Down