Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
96e3e36
Remove agg1 from the planner.
yhuai Nov 6, 2015
9921733
Add needed constructors.
yhuai Nov 7, 2015
f126520
Delete old agg functions.
yhuai Nov 7, 2015
5a177ec
Update function registry and DataFrame functions.
yhuai Nov 7, 2015
f5a389f
Update analyzer.
yhuai Nov 7, 2015
3d06cfd
Update optimizer.
yhuai Nov 7, 2015
7e77628
Update parsers, check analysis, and hive type coercion.
yhuai Nov 7, 2015
55841a5
Update catalyst dsl.
yhuai Nov 7, 2015
d2ca290
Update HiveTypeCoercion
yhuai Nov 7, 2015
aa8daa9
Update logical operators.
yhuai Nov 8, 2015
10630e1
Merge remote-tracking branch 'upstream/master' into removeAgg1
yhuai Nov 8, 2015
e8d39d1
Remove the mapping from old agg functions to new agg functions.
yhuai Nov 8, 2015
a3df1fd
Remove old agg operator.
yhuai Nov 8, 2015
5ed4970
Throw exception when we have count distinct. Once https://github.com/…
yhuai Nov 8, 2015
881afe8
Update tests.
yhuai Nov 8, 2015
2451233
Style.
yhuai Nov 8, 2015
1ef0b92
Check analysis.
yhuai Nov 9, 2015
35dfe93
Fix AggregateQuerySuite.
yhuai Nov 9, 2015
6d29195
Merge remote-tracking branch 'upstream/master' into removeAgg1
yhuai Nov 9, 2015
3835110
Fix test.
yhuai Nov 9, 2015
c88c97d
Since an agg function's children can be unresolved (when we use them …
yhuai Nov 9, 2015
c6c6c09
Fix tests.
yhuai Nov 9, 2015
b73156a
style
yhuai Nov 9, 2015
cb3f4d3
Renaming:
yhuai Nov 9, 2015
bf3dfb7
Fix style and tests.
yhuai Nov 9, 2015
c22c05e
Update docs and comments. Also, move MultipleDistinctRewriter to the …
yhuai Nov 9, 2015
10a86fd
Merge remote-tracking branch 'upstream/master' into removeAgg1
yhuai Nov 9, 2015
bad4184
Fix tests and override checkInputDataTypes.
yhuai Nov 9, 2015
76502b5
Change AggregateExpression's prettyString.
yhuai Nov 9, 2015
302edac
Add spark.sql.specializeSingleDistinctAggPlanning
yhuai Nov 9, 2015
b13c3f0
Update test.
yhuai Nov 10, 2015
5b9f4b5
Merge remote-tracking branch 'upstream/master' into removeAgg1
yhuai Nov 10, 2015
c37d179
Address comments
yhuai Nov 10, 2015
fb64896
R
yhuai Nov 10, 2015
16826e6
Update the prettyString of UnresolvedFunction.
yhuai Nov 10, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ setMethod("pmod", signature(y = "Column"),
#' @export
setMethod("approxCountDistinct",
signature(x = "Column"),
function(x, rsd = 0.95) {
function(x, rsd = 0.05) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shivaram Seems this default value should be 0.05 instead of 0.95, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah looks like it should be 0.05 -- cc @davies
This seems to have been added back when SparkR was a separate code-base in davies/SparkR-pkg@d7b17a4

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks!

jc <- callJStatic("org.apache.spark.sql.functions", "approxCountDistinct", x@jc, rsd)
column(jc)
})
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ def selectExpr(self, *expr):
This is a variant of :func:`select` that accepts SQL expressions.

>>> df.selectExpr("age * 2", "abs(age)").collect()
[Row((age * 2)=4, 'abs(age)=2), Row((age * 2)=10, 'abs(age)=5)]
[Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]
"""
if len(expr) == 1 and isinstance(expr[0], list):
expr = expr[0]
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def expr(str):
"""Parses the expression string into the column that it represents

>>> df.select(expr("length(name)")).collect()
[Row('length(name)=5), Row('length(name)=3)]
[Row(length(name)=5), Row(length(name)=3)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.expr(str))
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ def test_expr(self):
row = Row(a="length string", b=75)
df = self.sqlCtx.createDataFrame([row])
result = df.select(functions.expr("length(a)")).collect()[0].asDict()
self.assertEqual(13, result["'length(a)"])
self.assertEqual(13, result["length(a)"])

def test_replace(self):
schema = StructType([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst

private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean

protected[spark] def specializeSingleDistinctAggPlanning: Boolean
}

/**
Expand All @@ -29,7 +31,13 @@ object EmptyConf extends CatalystConf {
override def caseSensitiveAnalysis: Boolean = {
throw new UnsupportedOperationException
}

protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = {
throw new UnsupportedOperationException
}
}

/** A CatalystConf that can be used for local testing. */
case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf
case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf {
protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.language.implicitConversions
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.DataTypeParser
Expand Down Expand Up @@ -272,7 +273,7 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected lazy val function: Parser[Expression] =
( ident <~ ("(" ~ "*" ~ ")") ^^ { case udfName =>
if (lexical.normalizeKeyword(udfName) == "count") {
Count(Literal(1))
AggregateExpression(Count(Literal(1)), mode = Complete, isDistinct = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't just create unresolved functions here? (other than maybe the distinct case?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we hard code this when we deal with count(*). We can change it in another PR.

} else {
throw new AnalysisException(s"invalid expression $udfName(*)")
}
Expand All @@ -281,22 +282,25 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser {
{ case udfName ~ exprs => UnresolvedFunction(udfName, exprs, isDistinct = false) }
| ident ~ ("(" ~ DISTINCT ~> repsep(expression, ",")) <~ ")" ^^ { case udfName ~ exprs =>
lexical.normalizeKeyword(udfName) match {
case "sum" => SumDistinct(exprs.head)
case "count" => CountDistinct(exprs)
case "count" =>
aggregate.Count(exprs).toAggregateExpression(isDistinct = true)
case _ => UnresolvedFunction(udfName, exprs, isDistinct = true)
}
}
| APPROXIMATE ~> ident ~ ("(" ~ DISTINCT ~> expression <~ ")") ^^ { case udfName ~ exp =>
if (lexical.normalizeKeyword(udfName) == "count") {
ApproxCountDistinct(exp)
AggregateExpression(new HyperLogLogPlusPlus(exp), mode = Complete, isDistinct = false)
} else {
throw new AnalysisException(s"invalid function approximate $udfName")
}
}
| APPROXIMATE ~> "(" ~> unsignedFloat ~ ")" ~ ident ~ "(" ~ DISTINCT ~ expression <~ ")" ^^
{ case s ~ _ ~ udfName ~ _ ~ _ ~ exp =>
if (lexical.normalizeKeyword(udfName) == "count") {
ApproxCountDistinct(exp, s.toDouble)
AggregateExpression(
HyperLogLogPlusPlus(exp, s.toDouble, 0, 0),
mode = Complete,
isDistinct = false)
} else {
throw new AnalysisException(s"invalid function approximate($s) $udfName")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2, AggregateFunction2}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
Expand Down Expand Up @@ -79,6 +79,7 @@ class Analyzer(
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
DistinctAggregationRewriter(conf) ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
Expand Down Expand Up @@ -525,21 +526,14 @@ class Analyzer(
case u @ UnresolvedFunction(name, children, isDistinct) =>
withPosition(u) {
registry.lookupFunction(name, children) match {
// We get an aggregate function built based on AggregateFunction2 interface.
// So, we wrap it in AggregateExpression2.
case agg2: AggregateFunction2 => AggregateExpression2(agg2, Complete, isDistinct)
// Currently, our old aggregate function interface supports SUM(DISTINCT ...)
// and COUTN(DISTINCT ...).
case sumDistinct: SumDistinct => sumDistinct
case countDistinct: CountDistinct => countDistinct
// DISTINCT is not meaningful with Max and Min.
case max: Max if isDistinct => max
case min: Min if isDistinct => min
// For other aggregate functions, DISTINCT keyword is not supported for now.
// Once we converted to the new code path, we will allow using DISTINCT keyword.
case other: AggregateExpression1 if isDistinct =>
failAnalysis(s"$name does not support DISTINCT keyword.")
// If it does not have DISTINCT keyword, we will return it as is.
// DISTINCT is not meaningful for a Max or a Min.
case max: Max if isDistinct =>
AggregateExpression(max, Complete, isDistinct = false)
case min: Min if isDistinct =>
AggregateExpression(min, Complete, isDistinct = false)
// We get an aggregate function, we need to wrap it in an AggregateExpression.
case agg2: AggregateFunction => AggregateExpression(agg2, Complete, isDistinct)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agg2 -> agg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// This function is not an aggregate function, just return the resolved one.
case other => other
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, AggregateExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -108,7 +109,19 @@ trait CheckAnalysis {

case Aggregate(groupingExprs, aggregateExprs, child) =>
def checkValidAggregateExpression(expr: Expression): Unit = expr match {
case _: AggregateExpression => // OK
case aggExpr: AggregateExpression =>
// TODO: Is it possible that the child of a agg function is another
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After rewrite for multiple distinct, I think it's possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems our error message is not good. I will update this. We should not users to have nested agg functions like sum(avg(...)). I will add an error message to let users to use sub-queries.

// agg function?
aggExpr.aggregateFunction.children.foreach {
// This is just a sanity check, our analysis rule PullOutNondeterministic should
// already pull out those nondeterministic expressions and evaluate them in
// a Project node.
case child if !child.deterministic =>
failAnalysis(
s"nondeterministic expression ${expr.prettyString} should not " +
s"appear in the arguments of an aggregate function.")
case child => // OK
}
case e: Attribute if !groupingExprs.exists(_.semanticEquals(e)) =>
failAnalysis(
s"expression '${e.prettyString}' is neither present in the group by, " +
Expand All @@ -120,14 +133,26 @@ trait CheckAnalysis {
case e => e.children.foreach(checkValidAggregateExpression)
}

def checkValidGroupingExprs(expr: Expression): Unit = expr.dataType match {
case BinaryType =>
failAnalysis(s"binary type expression ${expr.prettyString} cannot be used " +
"in grouping expression")
case m: MapType =>
failAnalysis(s"map type expression ${expr.prettyString} cannot be used " +
"in grouping expression")
case _ => // OK
def checkValidGroupingExprs(expr: Expression): Unit = {
expr.dataType match {
case BinaryType =>
failAnalysis(s"binary type expression ${expr.prettyString} cannot be used " +
"in grouping expression")
case a: ArrayType =>
failAnalysis(s"array type expression ${expr.prettyString} cannot be used " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a regression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it is the case.

"in grouping expression")
case m: MapType =>
failAnalysis(s"map type expression ${expr.prettyString} cannot be used " +
"in grouping expression")
case _ => // OK
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also check UDT and types inside StructType

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
if (!expr.deterministic) {
// This is just a sanity check, our analysis rule PullOutNondeterministic should
// already pull out those nondeterministic expressions and evaluate them in
// a Project node.
failAnalysis(s"nondeterministic expression ${expr.prettyString} should not " +
s"appear in grouping expression.")
}
}

aggregateExprs.foreach(checkValidAggregateExpression)
Expand Down Expand Up @@ -179,7 +204,8 @@ trait CheckAnalysis {
s"unresolved operator ${operator.simpleString}")

case o if o.expressions.exists(!_.deterministic) &&
!o.isInstanceOf[Project] && !o.isInstanceOf[Filter] =>
!o.isInstanceOf[Project] && !o.isInstanceOf[Filter] & !o.isInstanceOf[Aggregate] =>
// The rule above is used to check Aggregate operator.
failAnalysis(
s"""nondeterministic expressions are only allowed in Project or Filter, found:
| ${o.expressions.map(_.prettyString).mkString(",")}
Expand Down
Loading