Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/** The mode of an [[AggregateFunction]]. */
Expand Down Expand Up @@ -183,6 +184,11 @@ abstract class AggregateFunction extends Expression {
*/
def inputAggBufferAttributes: Seq[AttributeReference]

/**
* Indicates if this function supports partial aggregation.
*/
def supportsPartial: Boolean = true

/**
* Result of the aggregate function when the input is empty. This is currently only used for the
* proper rewriting of distinct aggregate functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedExcept
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, DeclarativeAggregate, NoOp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -466,6 +467,7 @@ abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowF
override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
override def dataType: DataType = IntegerType
override def nullable: Boolean = true
override def supportsPartial: Boolean = SQLConf.get.supportPartialAggregationWindowFunctionUDAF
override lazy val mergeExpressions =
throw new UnsupportedOperationException("Window Functions do not support merging.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}
}

// Aggregation strategy can handle queries with a single distinct group.
if (distinctAggGroups.size > 1) {
// Check if the aggregates contains functions that do not support partial aggregation.
val existsNonPartial = aggExpressions.exists(!_.aggregateFunction.supportsPartial)
// Aggregation strategy can handle queries with a single distinct group and partial aggregates.
if (distinctAggGroups.size > 1 || (distinctAggGroups.size == 1 && existsNonPartial)) {
// Create the attributes for the grouping id and the group by clause.
val gid = AttributeReference("gid", IntegerType, nullable = false)()
val groupByMap = a.groupingExpressions.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,20 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val SUPPORT_PARTIAL_AGGREGATION_HIVE_UDAF =
buildConf("spark.sql.execution.supportPartialAggregationHiveUDAF")
.internal()
.doc("Decides whether partial aggregation is supported or not by the hive UDAF")
.booleanConf
.createWithDefault(true)

val SUPPORT_PARTIAL_AGGREGATION_WINDOW_FUNCTION_UDAF =
buildConf("spark.sql.execution.supportPartialAggregationWindowFunctionUDAF")
.internal()
.doc("Decides whether partial aggregation is supported or not by the window function UDAF")
.booleanConf
.createWithDefault(true)

val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion")
.internal()
.doc("Whether to delete the expired log files in file stream sink.")
Expand Down Expand Up @@ -1770,6 +1784,11 @@ class SQLConf extends Serializable with Logging {

def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG)

def supportPartialAggregationHiveUDAF: Boolean = getConf(SUPPORT_PARTIAL_AGGREGATION_HIVE_UDAF)

def supportPartialAggregationWindowFunctionUDAF: Boolean =
getConf(SUPPORT_PARTIAL_AGGREGATION_WINDOW_FUNCTION_UDAF)

def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD)

def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,18 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

val aggregateOperator =
if (functionsWithDistinct.isEmpty) {
if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
if (functionsWithDistinct.nonEmpty) {
sys.error("Distinct columns cannot exist in Aggregate operator containing " +
"aggregate functions which don't support partial aggregation.")
} else {
aggregate.AggUtils.planAggregateWithoutPartial(
groupingExpressions,
aggregateExpressions,
resultExpressions,
planLater(child))
}
} else if (functionsWithDistinct.isEmpty) {
aggregate.AggUtils.planAggregateWithoutDistinct(
groupingExpressions,
aggregateExpressions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ import org.apache.spark.sql.internal.SQLConf
* Utility functions used by the query planner to convert our plan to new aggregation code path.
*/
object AggUtils {

def planAggregateWithoutPartial(
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan] = {
val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete))
val completeAggregateAttributes = completeAggregateExpressions.map(_.resultAttribute)
SortAggregateExec(
requiredChildDistributionExpressions = Some(groupingExpressions),
groupingExpressions = groupingExpressions,
aggregateExpressions = completeAggregateExpressions,
aggregateAttributes = completeAggregateAttributes,
initialInputBufferOffset = 0,
resultExpressions = resultExpressions,
child = child
) :: Nil
}

private def createAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -391,6 +392,8 @@ private[hive] case class HiveUDAFFunction(

override def nullable: Boolean = true

override def supportsPartial: Boolean = SQLConf.get.supportPartialAggregationHiveUDAF

override lazy val dataType: DataType = inspectorToDataType(returnInspector)

override def prettyName: String = name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate}
import org.apache.spark.sql.hive.execution.TestingTypedCount.State
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

@ExpressionDescription(
Expand All @@ -42,6 +43,8 @@ case class TestingTypedCount(

override def nullable: Boolean = false

override val supportsPartial: Boolean = true

override def createAggregationBuffer(): State = TestingTypedCount.State(0L)

override def update(buffer: State, input: InternalRow): State = {
Expand Down