From 1a95a05b97c26e88038e45103a639e25a0edaa4d Mon Sep 17 00:00:00 2001 From: pgandhi Date: Sun, 19 Aug 2018 14:22:58 -0500 Subject: [PATCH 1/2] [SPARK-24935] : Problem with Executing Hive UDF's from Spark 2.2 Onwards Added a new conf spark.sql.supportPartialAggregation that is true by default but can be set to false when using custom UDAF's which do not support partial aggregation. --- .../apache/spark/sql/internal/SQLConf.scala | 8 ++++++++ .../spark/sql/execution/SparkStrategies.scala | 13 ++++++++++++- .../sql/execution/aggregate/AggUtils.scala | 19 +++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dbb5bb43b4f1f..efe45cc453d8f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -971,6 +971,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val SUPPORT_PARTIAL_AGGREGATION = buildConf("spark.sql.execution.supportPartialAggregation") + .internal() + .doc("Decides whether partial aggregation is supported or not by the UDAF") + .booleanConf + .createWithDefault(true) + val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream sink.") @@ -1770,6 +1776,8 @@ class SQLConf extends Serializable with Logging { def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG) + def supportPartialAggregation: Boolean = getConf(SUPPORT_PARTIAL_AGGREGATION) + def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD) def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b4179f4d12d35..46d02029c05f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -411,7 +411,18 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } val aggregateOperator = - if (functionsWithDistinct.isEmpty) { + if (!child.conf.supportPartialAggregation) { + if (functionsWithDistinct.nonEmpty) { + sys.error("Distinct columns cannot exist in Aggregate operator containing " + + "aggregate functions which don't support partial aggregation.") + } else { + aggregate.AggUtils.planAggregateWithoutPartial( + groupingExpressions, + aggregateExpressions, + resultExpressions, + planLater(child)) + } + } else if (functionsWithDistinct.isEmpty) { aggregate.AggUtils.planAggregateWithoutDistinct( groupingExpressions, aggregateExpressions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index c8ef2b3f6998d..705ba07f0b2af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -27,6 +27,25 @@ import org.apache.spark.sql.internal.SQLConf * Utility functions used by the query planner to convert our plan to new aggregation code path. */ object AggUtils { + + def planAggregateWithoutPartial( + groupingExpressions: Seq[NamedExpression], + aggregateExpressions: Seq[AggregateExpression], + resultExpressions: Seq[NamedExpression], + child: SparkPlan): Seq[SparkPlan] = { + val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) + val completeAggregateAttributes = completeAggregateExpressions.map(_.resultAttribute) + SortAggregateExec( + requiredChildDistributionExpressions = Some(groupingExpressions), + groupingExpressions = groupingExpressions, + aggregateExpressions = completeAggregateExpressions, + aggregateAttributes = completeAggregateAttributes, + initialInputBufferOffset = 0, + resultExpressions = resultExpressions, + child = child + ) :: Nil + } + private def createAggregate( requiredChildDistributionExpressions: Option[Seq[Expression]] = None, groupingExpressions: Seq[NamedExpression] = Nil, From 173035dcd239299f9f93cb70f7bdd863053463ff Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 5 Sep 2018 18:13:25 -0400 Subject: [PATCH 2/2] [SPARK-24935] : Making the flag configurable per udaf --- .../expressions/aggregate/interfaces.scala | 6 ++++++ .../expressions/windowExpressions.scala | 2 ++ .../optimizer/RewriteDistinctAggregates.scala | 6 ++++-- .../org/apache/spark/sql/internal/SQLConf.scala | 17 ++++++++++++++--- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../org/apache/spark/sql/hive/hiveUDFs.scala | 3 +++ .../sql/hive/execution/TestingTypedCount.scala | 3 +++ 7 files changed, 33 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index e1d16a2cd38b0..c9465e61af637 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** The mode of an [[AggregateFunction]]. */ @@ -183,6 +184,11 @@ abstract class AggregateFunction extends Expression { */ def inputAggBufferAttributes: Seq[AttributeReference] + /** + * Indicates if this function supports partial aggregation. + */ + def supportsPartial: Boolean = true + /** * Result of the aggregate function when the input is empty. This is currently only used for the * proper rewriting of distinct aggregate functions. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 707f312499734..b8b45296a6ad6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedExcept import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, DeclarativeAggregate, NoOp} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -466,6 +467,7 @@ abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowF override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow) override def dataType: DataType = IntegerType override def nullable: Boolean = true + override def supportsPartial: Boolean = SQLConf.get.supportPartialAggregationWindowFunctionUDAF override lazy val mergeExpressions = throw new UnsupportedOperationException("Window Functions do not support merging.") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 4448ace7105a4..08396f71ff7ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -131,8 +131,10 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { } } - // Aggregation strategy can handle queries with a single distinct group. - if (distinctAggGroups.size > 1) { + // Check if the aggregates contains functions that do not support partial aggregation. + val existsNonPartial = aggExpressions.exists(!_.aggregateFunction.supportsPartial) + // Aggregation strategy can handle queries with a single distinct group and partial aggregates. + if (distinctAggGroups.size > 1 || (distinctAggGroups.size == 1 && existsNonPartial)) { // Create the attributes for the grouping id and the group by clause. val gid = AttributeReference("gid", IntegerType, nullable = false)() val groupByMap = a.groupingExpressions.collect { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index efe45cc453d8f..72f79e4688459 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -971,9 +971,17 @@ object SQLConf { .booleanConf .createWithDefault(true) - val SUPPORT_PARTIAL_AGGREGATION = buildConf("spark.sql.execution.supportPartialAggregation") + val SUPPORT_PARTIAL_AGGREGATION_HIVE_UDAF = + buildConf("spark.sql.execution.supportPartialAggregationHiveUDAF") .internal() - .doc("Decides whether partial aggregation is supported or not by the UDAF") + .doc("Decides whether partial aggregation is supported or not by the hive UDAF") + .booleanConf + .createWithDefault(true) + + val SUPPORT_PARTIAL_AGGREGATION_WINDOW_FUNCTION_UDAF = + buildConf("spark.sql.execution.supportPartialAggregationWindowFunctionUDAF") + .internal() + .doc("Decides whether partial aggregation is supported or not by the window function UDAF") .booleanConf .createWithDefault(true) @@ -1776,7 +1784,10 @@ class SQLConf extends Serializable with Logging { def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG) - def supportPartialAggregation: Boolean = getConf(SUPPORT_PARTIAL_AGGREGATION) + def supportPartialAggregationHiveUDAF: Boolean = getConf(SUPPORT_PARTIAL_AGGREGATION_HIVE_UDAF) + + def supportPartialAggregationWindowFunctionUDAF: Boolean = + getConf(SUPPORT_PARTIAL_AGGREGATION_WINDOW_FUNCTION_UDAF) def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 46d02029c05f9..d4f1b8646fe79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -411,7 +411,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } val aggregateOperator = - if (!child.conf.supportPartialAggregation) { + if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) { if (functionsWithDistinct.nonEmpty) { sys.error("Distinct columns cannot exist in Aggregate operator containing " + "aggregate functions which don't support partial aggregation.") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 68af99ea272a8..d23f592475762 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.hive.HiveShim._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -391,6 +392,8 @@ private[hive] case class HiveUDAFFunction( override def nullable: Boolean = true + override def supportsPartial: Boolean = SQLConf.get.supportPartialAggregationHiveUDAF + override lazy val dataType: DataType = inspectorToDataType(returnInspector) override def prettyName: String = name diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala index 31b24301767af..1ac38e605d447 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate} import org.apache.spark.sql.hive.execution.TestingTypedCount.State +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @ExpressionDescription( @@ -42,6 +43,8 @@ case class TestingTypedCount( override def nullable: Boolean = false + override val supportsPartial: Boolean = true + override def createAggregationBuffer(): State = TestingTypedCount.State(0L) override def update(buffer: State, input: InternalRow): State = {