From bc3aecbaeaaf39078eb64367461c2967d76a73c2 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 24 Aug 2022 14:50:48 +0200 Subject: [PATCH 1/2] [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge --- .../sql/connector/read/SupportsMerge.java | 38 +++++++++++++++ .../optimizer/MergeScalarSubqueries.scala | 39 +++++++++++++++ .../execution/datasources/v2/FileScan.scala | 2 +- .../datasources/v2/parquet/ParquetScan.scala | 47 +++++++++++++++---- .../org/apache/spark/sql/SubquerySuite.scala | 40 ++++++++++++++++ 5 files changed, 157 insertions(+), 9 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsMerge.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsMerge.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsMerge.java new file mode 100644 index 0000000000000..1e502a1a9efa3 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsMerge.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import java.util.Optional; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.catalog.SupportsRead; + +/** + * A mix in interface for {@link Scan}. Data sources can implement this interface to indicate + * {@link Scan}s can be merged. + * + * @since 3.4.0 + */ +@Evolving +public interface SupportsMerge extends Scan { + + /** + * Returns the merged scan. + */ + Optional mergeWith(SupportsMerge other, SupportsRead table); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala index 4369ad9f96a6c..385d82c92c0be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala @@ -24,6 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationDef, CTERelationRef, Filter, Join, LogicalPlan, Project, Subquery, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{SCALAR_SUBQUERY, SCALAR_SUBQUERY_REFERENCE, TreePattern} +import org.apache.spark.sql.connector.catalog.SupportsRead +import org.apache.spark.sql.connector.read.SupportsMerge +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DataType @@ -279,6 +282,42 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] { } } + case ( + DataSourceV2ScanRelation(newRelation, newScan: SupportsMerge, newOutput, + newKeyGroupedPartitioning, newOrdering), + DataSourceV2ScanRelation(cachedRelation, cachedScan: SupportsMerge, cachedOutput, + cachedKeyGroupedPartitioning, cachedOrdering)) => + checkIdenticalPlans(newRelation, cachedRelation).flatMap { outputMap => + val mappedNewKeyGroupedPartitioning = + newKeyGroupedPartitioning.map(_.map(mapAttributes(_, outputMap))) + if (mappedNewKeyGroupedPartitioning.map(_.map(_.canonicalized)) == + cachedKeyGroupedPartitioning.map(_.map(_.canonicalized))) { + val mappedNewOrdering = newOrdering.map(_.map(mapAttributes(_, outputMap))) + if (mappedNewOrdering.map(_.map(_.canonicalized)) == + cachedOrdering.map(_.map(_.canonicalized))) { + Option(cachedScan.mergeWith(newScan, + cachedRelation.table.asInstanceOf[SupportsRead]).orElse(null)).map { mergedScan => + // Keep the original attributes of cached in merged + val mergedAttributes = mergedScan.readSchema().toAttributes + val cachedOutputNameMap = cachedOutput.map(a => a.name -> a).toMap + val mergedOutput = mergedAttributes.map { + case a => cachedOutputNameMap.getOrElse(a.name, a) + } + // Build the map from new to merged + val mergedOutputNameMap = mergedOutput.map(a => a.name -> a).toMap + val newOutputMap = + AttributeMap(newOutput.map(a => a -> mergedOutputNameMap(a.name).toAttribute)) + DataSourceV2ScanRelation(cachedRelation, mergedScan, mergedOutput, + cachedKeyGroupedPartitioning, cachedOrdering) -> newOutputMap + } + } else { + None + } + } else { + None + } + } + // Otherwise merging is not possible. case _ => None }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 9b6f99328663b..4eb56e06d744d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -82,7 +82,7 @@ trait FileScan extends Scan protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") - private lazy val (normalizedPartitionFilters, normalizedDataFilters) = { + protected lazy val (normalizedPartitionFilters, normalizedDataFilters) = { val partitionFilterAttributes = AttributeSet(partitionFilters).map(a => a.name -> a).toMap val normalizedPartitionFilters = ExpressionSet(partitionFilters.map( QueryPlan.normalizeExpressions(_, fileIndex.partitionSchema.toAttributes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index ff0b38880fd8e..eb8d701677677 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2.parquet +import java.util.Optional + import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration @@ -24,8 +26,9 @@ import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.connector.catalog.SupportsRead import org.apache.spark.sql.connector.expressions.aggregate.Aggregation -import org.apache.spark.sql.connector.read.PartitionReaderFactory +import org.apache.spark.sql.connector.read.{PartitionReaderFactory, SupportsMerge} import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, PartitioningAwareFileIndex, RowIndexUtil} import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetReadSupport, ParquetWriteSupport} import org.apache.spark.sql.execution.datasources.v2.FileScan @@ -46,7 +49,7 @@ case class ParquetScan( options: CaseInsensitiveStringMap, pushedAggregate: Option[Aggregation] = None, partitionFilters: Seq[Expression] = Seq.empty, - dataFilters: Seq[Expression] = Seq.empty) extends FileScan { + dataFilters: Seq[Expression] = Seq.empty) extends FileScan with SupportsMerge { override def isSplitable(path: Path): Boolean = { // If aggregate is pushed down, only the file footer will be read once, // so file should not be split across multiple tasks. @@ -106,15 +109,18 @@ case class ParquetScan( new ParquetOptions(options.asCaseSensitiveMap.asScala.toMap, sqlConf)) } + private def pushedDownAggEqual(p: ParquetScan) = { + if (pushedAggregate.nonEmpty && p.pushedAggregate.nonEmpty) { + AggregatePushDownUtils.equivalentAggregations(pushedAggregate.get, p.pushedAggregate.get) + } else { + pushedAggregate.isEmpty && p.pushedAggregate.isEmpty + } + } + override def equals(obj: Any): Boolean = obj match { case p: ParquetScan => - val pushedDownAggEqual = if (pushedAggregate.nonEmpty && p.pushedAggregate.nonEmpty) { - AggregatePushDownUtils.equivalentAggregations(pushedAggregate.get, p.pushedAggregate.get) - } else { - pushedAggregate.isEmpty && p.pushedAggregate.isEmpty - } super.equals(p) && dataSchema == p.dataSchema && options == p.options && - equivalentFilters(pushedFilters, p.pushedFilters) && pushedDownAggEqual + equivalentFilters(pushedFilters, p.pushedFilters) && pushedDownAggEqual(p) case _ => false } @@ -138,4 +144,29 @@ case class ParquetScan( Map("PushedAggregation" -> pushedAggregationsStr) ++ Map("PushedGroupBy" -> pushedGroupByStr) } + + override def mergeWith(other: SupportsMerge, table: SupportsRead): Optional[SupportsMerge] = { + if (other.isInstanceOf[ParquetScan]) { + val o = other.asInstanceOf[ParquetScan] + if (fileIndex == o.fileIndex && + options == o.options && + dataSchema == o.dataSchema && + equivalentFilters(pushedFilters, o.pushedFilters) && + pushedDownAggEqual(o) && + normalizedPartitionFilters == o.normalizedPartitionFilters && + normalizedDataFilters == o.normalizedDataFilters) { + val builder = table.newScanBuilder(options).asInstanceOf[ParquetScanBuilder] + pushedAggregate.map(builder.pushAggregation) + builder.pushFilters(dataFilters ++ partitionFilters) + builder.pruneColumns(readSchema().merge(o.readSchema())) + val scan = builder.build().asInstanceOf[ParquetScan] + + Optional.of(scan) + } else { + Optional.empty() + } + } else { + Optional.empty() + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 1ae5ae68d0788..7d433213fb76e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2177,6 +2177,46 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } + test("SPARK-40259: Merge non-correlated scalar subqueries with Parquet DSv2 sources") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + withTempPath { path => + testData + .withColumn("partition", $"key" % 10) + .write + .mode(SaveMode.Overwrite) + .partitionBy("partition") + .parquet(path.getCanonicalPath) + withTempView("td") { + spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("td") + Seq(false).foreach { enableAQE => + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString) { + val df = sql( + """ + |SELECT + | (SELECT sum(key) FROM td WHERE partition < 5), + | (SELECT sum(key) FROM td WHERE partition >= 5), + | (SELECT sum(value) FROM td WHERE partition < 5), + | (SELECT sum(value) FROM td WHERE partition >= 5) + """.stripMargin) + + checkAnswer(df, Row(2450, 2600, 2450.0, 2600.0) :: Nil) + + val plan = df.queryExecution.executedPlan + val subqueryIds = collectWithSubqueries(plan) { case s: SubqueryExec => s.id } + val reusedSubqueryIds = collectWithSubqueries(plan) { + case rs: ReusedSubqueryExec => rs.child.id + } + + assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan") + assert(reusedSubqueryIds.size == 2, + "Missing or unexpected reused ReusedSubqueryExec in the plan") + } + } + } + } + } + } + test("SPARK-39355: Single column uses quoted to construct UnresolvedAttribute") { checkAnswer( sql(""" From 0eefece6d6aae764b8f185dc171e6c89e22e5ee1 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 30 Aug 2022 08:57:13 +0200 Subject: [PATCH 2/2] fix test --- .../src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 7d433213fb76e..cc306551145eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2188,7 +2188,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark .parquet(path.getCanonicalPath) withTempView("td") { spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("td") - Seq(false).foreach { enableAQE => + Seq(false, true).foreach { enableAQE => withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString) { val df = sql( """