Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,6 @@ abstract class AggregateFunction extends Expression {
*/
def inputAggBufferAttributes: Seq[AttributeReference]

/**
* Indicates if this function supports partial aggregation.
* Currently Hive UDAF is the only one that doesn't support partial aggregation.
*/
def supportsPartial: Boolean = true

/**
* Result of the aggregate function when the input is empty. This is currently only used for the
* proper rewriting of distinct aggregate functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowF
override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
override def dataType: DataType = IntegerType
override def nullable: Boolean = true
override def supportsPartial: Boolean = false
override lazy val mergeExpressions =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell is it allowed to use aggregate window functions in normal aggregate?

Copy link
Contributor

@hvanhovell hvanhovell Jan 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is not. These function should calculate a result for each value in a group for a single key. They also require that the group is ordered. Using these in a regular aggregate does not make much sense since they would degrade into a count, a distinct count (if the groups are ordered, random otherwise), or some constant (1 probably).

I think throwing an NotImplementedError is fine in this case, there is logic in Catalyst to prevent a user from using these function in a regular aggregate.

throw new UnsupportedOperationException("Window Functions do not support merging.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}
}

// Check if the aggregates contains functions that do not support partial aggregation.
val existsNonPartial = aggExpressions.exists(!_.aggregateFunction.supportsPartial)

// Aggregation strategy can handle queries with a single distinct group and partial aggregates.
if (distinctAggGroups.size > 1 || (distinctAggGroups.size == 1 && existsNonPartial)) {
// Aggregation strategy can handle queries with a single distinct group.
if (distinctAggGroups.size > 1) {
// Create the attributes for the grouping id and the group by clause.
val gid = AttributeReference("gid", IntegerType, nullable = false)(isGenerated = true)
val groupByMap = a.groupingExpressions.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

val aggregateOperator =
if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
if (functionsWithDistinct.nonEmpty) {
sys.error("Distinct columns cannot exist in Aggregate operator containing " +
"aggregate functions which don't support partial aggregation.")
} else {
aggregate.AggUtils.planAggregateWithoutPartial(
groupingExpressions,
aggregateExpressions,
resultExpressions,
planLater(child))
}
} else if (functionsWithDistinct.isEmpty) {
if (functionsWithDistinct.isEmpty) {
aggregate.AggUtils.planAggregateWithoutDistinct(
groupingExpressions,
aggregateExpressions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,6 @@ import org.apache.spark.sql.internal.SQLConf
* Utility functions used by the query planner to convert our plan to new aggregation code path.
*/
object AggUtils {

def planAggregateWithoutPartial(
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan] = {

val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete))
val completeAggregateAttributes = completeAggregateExpressions.map(_.resultAttribute)
SortAggregateExec(
requiredChildDistributionExpressions = Some(groupingExpressions),
groupingExpressions = groupingExpressions,
aggregateExpressions = completeAggregateExpressions,
aggregateAttributes = completeAggregateAttributes,
initialInputBufferOffset = 0,
resultExpressions = resultExpressions,
child = child
) :: Nil
}

private def createAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,6 @@ private[hive] case class HiveUDAFFunction(

override def nullable: Boolean = true

override def supportsPartial: Boolean = true

override lazy val dataType: DataType = inspectorToDataType(returnInspector)

override def prettyName: String = name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ case class TestingTypedCount(

override def nullable: Boolean = false

override val supportsPartial: Boolean = true

override def createAggregationBuffer(): State = TestingTypedCount.State(0L)

override def update(buffer: State, input: InternalRow): State = {
Expand Down