From 6bd09b1bcc0667f8bae5694a6bff293398dd168d Mon Sep 17 00:00:00 2001
From: Gengliang Wang
Date: Thu, 24 Jan 2019 17:02:45 +0800
Subject: [PATCH 1/7] fix
---
docs/sql-data-sources-parquet.md | 12 ------------
.../org/apache/spark/sql/internal/SQLConf.scala | 5 +++--
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 13 ++++++++++++-
.../execution/OptimizeMetadataOnlyQuerySuite.scala | 4 ++--
.../spark/sql/hive/execution/SQLQuerySuite.scala | 13 ++++++++++++-
5 files changed, 29 insertions(+), 18 deletions(-)
diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md
index dcd2936518465..5532bf9fa3f66 100644
--- a/docs/sql-data-sources-parquet.md
+++ b/docs/sql-data-sources-parquet.md
@@ -295,18 +295,6 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
-
- spark.sql.optimizer.metadataOnly |
- true |
-
-
- When true, enable the metadata-only query optimization that use the table's metadata to
- produce the partition columns instead of table scans. It applies when all the columns scanned
- are partition columns and the query has an aggregate operator that satisfies distinct
- semantics.
-
- |
-
spark.sql.parquet.writeLegacyFormat |
false |
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6b301c3c9cb5b..f6113447a1484 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -585,9 +585,10 @@ object SQLConf {
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
"scanned are partition columns and the query has an aggregate operator that satisfies " +
- "distinct semantics.")
+ "distinct semantics. By default the optimization is disabled, since it may return " +
+ "incorrect results with empty tables.")
.booleanConf
- .createWithDefault(true)
+ .createWithDefault(false)
val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
.doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 806f0b2239fe6..68cd3a0f3eef3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2422,7 +2422,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(s"$expected") :: Nil)
}
- test("SPARK-15752 optimize metadata only query for datasource table") {
+ ignore("SPARK-15752 optimize metadata only query for datasource table") {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
withTable("srcpart_15752") {
val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "a" else "b"))
@@ -2966,6 +2966,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}
+
+ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+ withTable("t") {
+ sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
+ sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
+ checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
+ checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
+ }
+ }
+ }
}
case class Foo(bar: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
index a543eb8351656..f37802309be97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
@@ -58,7 +58,7 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
}
private def testMetadataOnly(name: String, sqls: String*): Unit = {
- test(name) {
+ ignore(name) {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
}
@@ -69,7 +69,7 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
}
private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
- test(name) {
+ ignore(name) {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 70efad103d13e..df4d536a9c9c2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -86,6 +86,17 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assert(message.contains("Table or view not found"))
}
+ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+ withTable("t") {
+ sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
+ sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
+ checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
+ checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
+ }
+ }
+ }
+
test("script") {
assume(TestUtils.testCommandAvailable("/bin/bash"))
assume(TestUtils.testCommandAvailable("echo | sed"))
@@ -1770,7 +1781,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- test("SPARK-15752 optimize metadata only query for hive table") {
+ ignore("SPARK-15752 optimize metadata only query for hive table") {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
withTable("data_15752", "srcpart_15752", "srctext_15752") {
val df = Seq((1, "2"), (3, "4")).toDF("key", "value")
From 1c81586d7c7b3a7a9785719c486a6d857265eb89 Mon Sep 17 00:00:00 2001
From: Gengliang Wang
Date: Thu, 24 Jan 2019 22:35:14 +0800
Subject: [PATCH 2/7] revert the whole rule
---
.../apache/spark/sql/internal/SQLConf.scala | 11 --
.../execution/OptimizeMetadataOnlyQuery.scala | 173 ------------------
.../spark/sql/execution/SparkOptimizer.scala | 1 -
.../org/apache/spark/sql/SQLQuerySuite.scala | 47 -----
.../OptimizeMetadataOnlyQuerySuite.scala | 150 ---------------
.../OptimizeHiveMetadataOnlyQuerySuite.scala | 78 --------
.../sql/hive/execution/SQLQuerySuite.scala | 100 ----------
7 files changed, 560 deletions(-)
delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f6113447a1484..516193bdeaaa8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -581,15 +581,6 @@ object SQLConf {
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
- val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
- .doc("When true, enable the metadata-only query optimization that use the table's metadata " +
- "to produce the partition columns instead of table scans. It applies when all the columns " +
- "scanned are partition columns and the query has an aggregate operator that satisfies " +
- "distinct semantics. By default the optimization is disabled, since it may return " +
- "incorrect results with empty tables.")
- .booleanConf
- .createWithDefault(false)
-
val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
.doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
"to parse.")
@@ -1784,8 +1775,6 @@ class SQLConf extends Serializable with Logging {
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
- def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
-
def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
def wholeStageUseIdInClassName: Boolean = getConf(WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
deleted file mode 100644
index 3ca03ab2939aa..0000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import java.util.Locale
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * This rule optimizes the execution of queries that can be answered by looking only at
- * partition-level metadata. This applies when all the columns scanned are partition columns, and
- * the query has an aggregate operator that satisfies the following conditions:
- * 1. aggregate expression is partition columns.
- * e.g. SELECT col FROM tbl GROUP BY col.
- * 2. aggregate function on partition columns with DISTINCT.
- * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
- * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword.
- * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
- */
-case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[LogicalPlan] {
-
- def apply(plan: LogicalPlan): LogicalPlan = {
- if (!SQLConf.get.optimizerMetadataOnly) {
- return plan
- }
-
- plan.transform {
- case a @ Aggregate(_, aggExprs, child @ PhysicalOperation(
- projectList, filters, PartitionedRelation(partAttrs, rel))) =>
- // We only apply this optimization when only partitioned attributes are scanned.
- if (AttributeSet((projectList ++ filters).flatMap(_.references)).subsetOf(partAttrs)) {
- // The project list and filters all only refer to partition attributes, which means the
- // the Aggregator operator can also only refer to partition attributes, and filters are
- // all partition filters. This is a metadata only query we can optimize.
- val aggFunctions = aggExprs.flatMap(_.collect {
- case agg: AggregateExpression => agg
- })
- val isAllDistinctAgg = aggFunctions.forall { agg =>
- agg.isDistinct || (agg.aggregateFunction match {
- // `Max`, `Min`, `First` and `Last` are always distinct aggregate functions no matter
- // they have DISTINCT keyword or not, as the result will be same.
- case _: Max => true
- case _: Min => true
- case _: First => true
- case _: Last => true
- case _ => false
- })
- }
- if (isAllDistinctAgg) {
- a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
- } else {
- a
- }
- } else {
- a
- }
- }
- }
-
- /**
- * Returns the partition attributes of the table relation plan.
- */
- private def getPartitionAttrs(
- partitionColumnNames: Seq[String],
- relation: LogicalPlan): Seq[Attribute] = {
- val attrMap = relation.output.map(a => a.name.toLowerCase(Locale.ROOT) -> a).toMap
- partitionColumnNames.map { colName =>
- attrMap.getOrElse(colName.toLowerCase(Locale.ROOT),
- throw new AnalysisException(s"Unable to find the column `$colName` " +
- s"given [${relation.output.map(_.name).mkString(", ")}]")
- )
- }
- }
-
- /**
- * Transform the given plan, find its table scan nodes that matches the given relation, and then
- * replace the table scan node with its corresponding partition values.
- */
- private def replaceTableScanWithPartitionMetadata(
- child: LogicalPlan,
- relation: LogicalPlan,
- partFilters: Seq[Expression]): LogicalPlan = {
- // this logic comes from PruneFileSourcePartitions. it ensures that the filter names match the
- // relation's schema. PartitionedRelation ensures that the filters only reference partition cols
- val normalizedFilters = partFilters.map { e =>
- e transform {
- case a: AttributeReference =>
- a.withName(relation.output.find(_.semanticEquals(a)).get.name)
- }
- }
-
- child transform {
- case plan if plan eq relation =>
- relation match {
- case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
- val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
- val partitionData = fsRelation.location.listFiles(normalizedFilters, Nil)
- LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)
-
- case relation: HiveTableRelation =>
- val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
- val caseInsensitiveProperties =
- CaseInsensitiveMap(relation.tableMeta.storage.properties)
- val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
- .getOrElse(SQLConf.get.sessionLocalTimeZone)
- val partitions = if (partFilters.nonEmpty) {
- catalog.listPartitionsByFilter(relation.tableMeta.identifier, normalizedFilters)
- } else {
- catalog.listPartitions(relation.tableMeta.identifier)
- }
-
- val partitionData = partitions.map { p =>
- InternalRow.fromSeq(partAttrs.map { attr =>
- Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval()
- })
- }
- LocalRelation(partAttrs, partitionData)
-
- case _ =>
- throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
- s"please turn off ${SQLConf.OPTIMIZER_METADATA_ONLY.key} and try again.")
- }
- }
- }
-
- /**
- * A pattern that finds the partitioned table relation node inside the given plan, and returns a
- * pair of the partition attributes and the table relation node.
- */
- object PartitionedRelation extends PredicateHelper {
-
- def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = {
- plan match {
- case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
- if fsRelation.partitionSchema.nonEmpty =>
- val partAttrs = AttributeSet(getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l))
- Some((partAttrs, l))
-
- case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
- val partAttrs = AttributeSet(
- getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation))
- Some((partAttrs, relation))
-
- case _ => None
- }
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 6c6d344240cea..5e5255ec91e8c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -30,7 +30,6 @@ class SparkOptimizer(
extends Optimizer(catalog) {
override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
- Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("Extract Python UDFs", Once,
Seq(ExtractPythonUDFFromAggregate, ExtractPythonUDFs): _*) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 68cd3a0f3eef3..2a33101c23dbe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2422,42 +2422,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(s"$expected") :: Nil)
}
- ignore("SPARK-15752 optimize metadata only query for datasource table") {
- withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
- withTable("srcpart_15752") {
- val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "a" else "b"))
- .toDF("col1", "col2", "partcol1", "partcol2")
- data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart_15752")
- checkAnswer(
- sql("select partcol1 from srcpart_15752 group by partcol1"),
- Row(0) :: Row(1) :: Nil)
- checkAnswer(
- sql("select partcol1 from srcpart_15752 where partcol1 = 1 group by partcol1"),
- Row(1))
- checkAnswer(
- sql("select partcol1, count(distinct partcol2) from srcpart_15752 group by partcol1"),
- Row(0, 1) :: Row(1, 1) :: Nil)
- checkAnswer(
- sql("select partcol1, count(distinct partcol2) from srcpart_15752 where partcol1 = 1 " +
- "group by partcol1"),
- Row(1, 1) :: Nil)
- checkAnswer(sql("select distinct partcol1 from srcpart_15752"), Row(0) :: Row(1) :: Nil)
- checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 1"), Row(1))
- checkAnswer(
- sql("select distinct col from (select partcol1 + 1 as col from srcpart_15752 " +
- "where partcol1 = 1) t"),
- Row(2))
- checkAnswer(sql("select max(partcol1) from srcpart_15752"), Row(1))
- checkAnswer(sql("select max(partcol1) from srcpart_15752 where partcol1 = 1"), Row(1))
- checkAnswer(sql("select max(partcol1) from (select partcol1 from srcpart_15752) t"), Row(1))
- checkAnswer(
- sql("select max(col) from (select partcol1 + 1 as col from srcpart_15752 " +
- "where partcol1 = 1) t"),
- Row(2))
- }
- }
- }
-
test("SPARK-16975: Column-partition path starting '_' should be handled correctly") {
withTempDir { dir =>
val dataDir = new File(dir, "data").getCanonicalPath
@@ -2966,17 +2930,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}
-
- test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
- withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
- withTable("t") {
- sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
- sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
- checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
- checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
- }
- }
- }
}
case class Foo(bar: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
deleted file mode 100644
index f37802309be97..0000000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import java.io.File
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
-import org.apache.spark.sql.test.SharedSQLContext
-
-class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
- import testImplicits._
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "even" else "odd"))
- .toDF("col1", "col2", "partcol1", "partcol2")
- data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart")
- }
-
- override protected def afterAll(): Unit = {
- try {
- sql("DROP TABLE IF EXISTS srcpart")
- } finally {
- super.afterAll()
- }
- }
-
- private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
- val localRelations = df.queryExecution.optimizedPlan.collect {
- case l @ LocalRelation(_, _, _) => l
- }
- assert(localRelations.size == 1)
- }
-
- private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
- val localRelations = df.queryExecution.optimizedPlan.collect {
- case l @ LocalRelation(_, _, _) => l
- }
- assert(localRelations.size == 0)
- }
-
- private def testMetadataOnly(name: String, sqls: String*): Unit = {
- ignore(name) {
- withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
- sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
- }
- withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
- sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
- }
- }
- }
-
- private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
- ignore(name) {
- withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
- sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
- }
- withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
- sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
- }
- }
- }
-
- testMetadataOnly(
- "Aggregate expression is partition columns",
- "select partcol1 from srcpart group by partcol1",
- "select partcol2 from srcpart where partcol1 = 0 group by partcol2")
-
- testMetadataOnly(
- "Distinct aggregate function on partition columns",
- "SELECT partcol1, count(distinct partcol2) FROM srcpart group by partcol1",
- "SELECT partcol1, count(distinct partcol2) FROM srcpart where partcol1 = 0 group by partcol1")
-
- testMetadataOnly(
- "Distinct on partition columns",
- "select distinct partcol1, partcol2 from srcpart",
- "select distinct c1 from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t")
-
- testMetadataOnly(
- "Aggregate function on partition columns which have same result w or w/o DISTINCT keyword",
- "select max(partcol1) from srcpart",
- "select min(partcol1) from srcpart where partcol1 = 0",
- "select first(partcol1) from srcpart",
- "select last(partcol1) from srcpart where partcol1 = 0",
- "select partcol2, min(partcol1) from srcpart where partcol1 = 0 group by partcol2",
- "select max(c1) from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t")
-
- testNotMetadataOnly(
- "Don't optimize metadata only query for non-partition columns",
- "select col1 from srcpart group by col1",
- "select partcol1, max(col1) from srcpart group by partcol1",
- "select partcol1, count(distinct col1) from srcpart group by partcol1",
- "select distinct partcol1, col1 from srcpart")
-
- testNotMetadataOnly(
- "Don't optimize metadata only query for non-distinct aggregate function on partition columns",
- "select partcol1, sum(partcol2) from srcpart group by partcol1",
- "select partcol1, count(partcol2) from srcpart group by partcol1")
-
- testNotMetadataOnly(
- "Don't optimize metadata only query for GroupingSet/Union operator",
- "select partcol1, max(partcol2) from srcpart where partcol1 = 0 group by rollup (partcol1)",
- "select partcol2 from (select partcol2 from srcpart where partcol1 = 0 union all " +
- "select partcol2 from srcpart where partcol1 = 1) t group by partcol2")
-
- test("SPARK-21884 Fix StackOverflowError on MetadataOnlyQuery") {
- withTable("t_1000") {
- sql("CREATE TABLE t_1000 (a INT, p INT) USING PARQUET PARTITIONED BY (p)")
- (1 to 1000).foreach(p => sql(s"ALTER TABLE t_1000 ADD PARTITION (p=$p)"))
- sql("SELECT COUNT(DISTINCT p) FROM t_1000").collect()
- }
- }
-
- test("Incorrect result caused by the rule OptimizeMetadataOnlyQuery") {
- withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
- withTempPath { path =>
- val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
- Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
- .write.json(tablePath.getCanonicalPath)
-
- val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
- checkAnswer(df, Row("a", "e", "c"))
-
- val localRelation = df.queryExecution.optimizedPlan.collectFirst {
- case l: LocalRelation => l
- }
- assert(localRelation.nonEmpty, "expect to see a LocalRelation")
- assert(localRelation.get.output.map(_.name) == Seq("cOl3", "cOl1", "cOl5"))
- }
- }
- }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala
deleted file mode 100644
index 1e525c46a9cfb..0000000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.catalyst.expressions.NamedExpression
-import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Filter, Project, SubqueryAlias}
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-
-class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleton
- with BeforeAndAfter with SQLTestUtils {
-
- import spark.implicits._
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- sql("CREATE TABLE metadata_only (id bigint, data string) PARTITIONED BY (part int)")
- (0 to 10).foreach(p => sql(s"ALTER TABLE metadata_only ADD PARTITION (part=$p)"))
- }
-
- override protected def afterAll(): Unit = {
- try {
- sql("DROP TABLE IF EXISTS metadata_only")
- } finally {
- super.afterAll()
- }
- }
-
- test("SPARK-23877: validate metadata-only query pushes filters to metastore") {
- withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
- val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
-
- // verify the number of matching partitions
- assert(sql("SELECT DISTINCT part FROM metadata_only WHERE part < 5").collect().length === 5)
-
- // verify that the partition predicate was pushed down to the metastore
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount === 5)
- }
- }
-
- test("SPARK-23877: filter on projected expression") {
- withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
- val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
-
- // verify the matching partitions
- val partitions = spark.internalCreateDataFrame(Distinct(Filter(($"x" < 5).expr,
- Project(Seq(($"part" + 1).as("x").expr.asInstanceOf[NamedExpression]),
- spark.table("metadata_only").logicalPlan.asInstanceOf[SubqueryAlias].child)))
- .queryExecution.toRdd, StructType(Seq(StructField("x", IntegerType))))
-
- checkAnswer(partitions, Seq(1, 2, 3, 4).toDF("x"))
-
- // verify that the partition predicate was not pushed down to the metastore
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount == 11)
- }
- }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index df4d536a9c9c2..01c40d1541734 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -86,17 +86,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assert(message.contains("Table or view not found"))
}
- test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
- withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
- withTable("t") {
- sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
- sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
- checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
- checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
- }
- }
- }
-
test("script") {
assume(TestUtils.testCommandAvailable("/bin/bash"))
assume(TestUtils.testCommandAvailable("echo | sed"))
@@ -1781,95 +1770,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- ignore("SPARK-15752 optimize metadata only query for hive table") {
- withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
- withTable("data_15752", "srcpart_15752", "srctext_15752") {
- val df = Seq((1, "2"), (3, "4")).toDF("key", "value")
- df.createOrReplaceTempView("data_15752")
- sql(
- """
- |CREATE TABLE srcpart_15752 (col1 INT, col2 STRING)
- |PARTITIONED BY (partcol1 INT, partcol2 STRING) STORED AS parquet
- """.stripMargin)
- for (partcol1 <- Seq(0, 1); partcol2 <- Seq("a", "b")) {
- sql(
- s"""
- |INSERT OVERWRITE TABLE srcpart_15752
- |PARTITION (partcol1='$partcol1', partcol2='$partcol2')
- |select key, value from data_15752
- """.stripMargin)
- }
- checkAnswer(
- sql("select partcol1 from srcpart_15752 group by partcol1"),
- Row(0) :: Row(1) :: Nil)
- checkAnswer(
- sql("select partcol1 from srcpart_15752 where partcol1 = 1 group by partcol1"),
- Row(1))
- checkAnswer(
- sql("select partcol1, count(distinct partcol2) from srcpart_15752 group by partcol1"),
- Row(0, 2) :: Row(1, 2) :: Nil)
- checkAnswer(
- sql("select partcol1, count(distinct partcol2) from srcpart_15752 where partcol1 = 1 " +
- "group by partcol1"),
- Row(1, 2) :: Nil)
- checkAnswer(sql("select distinct partcol1 from srcpart_15752"), Row(0) :: Row(1) :: Nil)
- checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 1"), Row(1))
- checkAnswer(
- sql("select distinct col from (select partcol1 + 1 as col from srcpart_15752 " +
- "where partcol1 = 1) t"),
- Row(2))
- checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 1"), Row(1))
- checkAnswer(sql("select max(partcol1) from srcpart_15752"), Row(1))
- checkAnswer(sql("select max(partcol1) from srcpart_15752 where partcol1 = 1"), Row(1))
- checkAnswer(sql("select max(partcol1) from (select partcol1 from srcpart_15752) t"), Row(1))
- checkAnswer(
- sql("select max(col) from (select partcol1 + 1 as col from srcpart_15752 " +
- "where partcol1 = 1) t"),
- Row(2))
-
- sql(
- """
- |CREATE TABLE srctext_15752 (col1 INT, col2 STRING)
- |PARTITIONED BY (partcol1 INT, partcol2 STRING) STORED AS textfile
- """.stripMargin)
- for (partcol1 <- Seq(0, 1); partcol2 <- Seq("a", "b")) {
- sql(
- s"""
- |INSERT OVERWRITE TABLE srctext_15752
- |PARTITION (partcol1='$partcol1', partcol2='$partcol2')
- |select key, value from data_15752
- """.stripMargin)
- }
- checkAnswer(
- sql("select partcol1 from srctext_15752 group by partcol1"),
- Row(0) :: Row(1) :: Nil)
- checkAnswer(
- sql("select partcol1 from srctext_15752 where partcol1 = 1 group by partcol1"),
- Row(1))
- checkAnswer(
- sql("select partcol1, count(distinct partcol2) from srctext_15752 group by partcol1"),
- Row(0, 2) :: Row(1, 2) :: Nil)
- checkAnswer(
- sql("select partcol1, count(distinct partcol2) from srctext_15752 where partcol1 = 1 " +
- "group by partcol1"),
- Row(1, 2) :: Nil)
- checkAnswer(sql("select distinct partcol1 from srctext_15752"), Row(0) :: Row(1) :: Nil)
- checkAnswer(sql("select distinct partcol1 from srctext_15752 where partcol1 = 1"), Row(1))
- checkAnswer(
- sql("select distinct col from (select partcol1 + 1 as col from srctext_15752 " +
- "where partcol1 = 1) t"),
- Row(2))
- checkAnswer(sql("select max(partcol1) from srctext_15752"), Row(1))
- checkAnswer(sql("select max(partcol1) from srctext_15752 where partcol1 = 1"), Row(1))
- checkAnswer(sql("select max(partcol1) from (select partcol1 from srctext_15752) t"), Row(1))
- checkAnswer(
- sql("select max(col) from (select partcol1 + 1 as col from srctext_15752 " +
- "where partcol1 = 1) t"),
- Row(2))
- }
- }
- }
-
test("SPARK-17354: Partitioning by dates/timestamps works with Parquet vectorized reader") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
sql(
From 15cc8726a6454e45e5330bebd7e96d31500c2d6b Mon Sep 17 00:00:00 2001
From: Gengliang Wang
Date: Fri, 25 Jan 2019 01:34:21 +0800
Subject: [PATCH 3/7] Revert "revert the whole rule"
This reverts commit 1c81586d7c7b3a7a9785719c486a6d857265eb89.
---
.../apache/spark/sql/internal/SQLConf.scala | 11 ++
.../execution/OptimizeMetadataOnlyQuery.scala | 173 ++++++++++++++++++
.../spark/sql/execution/SparkOptimizer.scala | 1 +
.../org/apache/spark/sql/SQLQuerySuite.scala | 47 +++++
.../OptimizeMetadataOnlyQuerySuite.scala | 150 +++++++++++++++
.../OptimizeHiveMetadataOnlyQuerySuite.scala | 78 ++++++++
.../sql/hive/execution/SQLQuerySuite.scala | 100 ++++++++++
7 files changed, 560 insertions(+)
create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 516193bdeaaa8..f6113447a1484 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -581,6 +581,15 @@ object SQLConf {
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
+ val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
+ .doc("When true, enable the metadata-only query optimization that use the table's metadata " +
+ "to produce the partition columns instead of table scans. It applies when all the columns " +
+ "scanned are partition columns and the query has an aggregate operator that satisfies " +
+ "distinct semantics. By default the optimization is disabled, since it may return " +
+ "incorrect results with empty tables.")
+ .booleanConf
+ .createWithDefault(false)
+
val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
.doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
"to parse.")
@@ -1775,6 +1784,8 @@ class SQLConf extends Serializable with Logging {
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
+ def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
+
def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
def wholeStageUseIdInClassName: Boolean = getConf(WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
new file mode 100644
index 0000000000000..3ca03ab2939aa
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.Locale
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by looking only at
+ * partition-level metadata. This applies when all the columns scanned are partition columns, and
+ * the query has an aggregate operator that satisfies the following conditions:
+ * 1. aggregate expression is partition columns.
+ * e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword.
+ * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ if (!SQLConf.get.optimizerMetadataOnly) {
+ return plan
+ }
+
+ plan.transform {
+ case a @ Aggregate(_, aggExprs, child @ PhysicalOperation(
+ projectList, filters, PartitionedRelation(partAttrs, rel))) =>
+ // We only apply this optimization when only partitioned attributes are scanned.
+ if (AttributeSet((projectList ++ filters).flatMap(_.references)).subsetOf(partAttrs)) {
+ // The project list and filters all only refer to partition attributes, which means the
+ // the Aggregator operator can also only refer to partition attributes, and filters are
+ // all partition filters. This is a metadata only query we can optimize.
+ val aggFunctions = aggExprs.flatMap(_.collect {
+ case agg: AggregateExpression => agg
+ })
+ val isAllDistinctAgg = aggFunctions.forall { agg =>
+ agg.isDistinct || (agg.aggregateFunction match {
+ // `Max`, `Min`, `First` and `Last` are always distinct aggregate functions no matter
+ // they have DISTINCT keyword or not, as the result will be same.
+ case _: Max => true
+ case _: Min => true
+ case _: First => true
+ case _: Last => true
+ case _ => false
+ })
+ }
+ if (isAllDistinctAgg) {
+ a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
+ } else {
+ a
+ }
+ } else {
+ a
+ }
+ }
+ }
+
+ /**
+ * Returns the partition attributes of the table relation plan.
+ */
+ private def getPartitionAttrs(
+ partitionColumnNames: Seq[String],
+ relation: LogicalPlan): Seq[Attribute] = {
+ val attrMap = relation.output.map(a => a.name.toLowerCase(Locale.ROOT) -> a).toMap
+ partitionColumnNames.map { colName =>
+ attrMap.getOrElse(colName.toLowerCase(Locale.ROOT),
+ throw new AnalysisException(s"Unable to find the column `$colName` " +
+ s"given [${relation.output.map(_.name).mkString(", ")}]")
+ )
+ }
+ }
+
+ /**
+ * Transform the given plan, find its table scan nodes that matches the given relation, and then
+ * replace the table scan node with its corresponding partition values.
+ */
+ private def replaceTableScanWithPartitionMetadata(
+ child: LogicalPlan,
+ relation: LogicalPlan,
+ partFilters: Seq[Expression]): LogicalPlan = {
+ // this logic comes from PruneFileSourcePartitions. it ensures that the filter names match the
+ // relation's schema. PartitionedRelation ensures that the filters only reference partition cols
+ val normalizedFilters = partFilters.map { e =>
+ e transform {
+ case a: AttributeReference =>
+ a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+ }
+ }
+
+ child transform {
+ case plan if plan eq relation =>
+ relation match {
+ case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
+ val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+ val partitionData = fsRelation.location.listFiles(normalizedFilters, Nil)
+ LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)
+
+ case relation: HiveTableRelation =>
+ val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+ val caseInsensitiveProperties =
+ CaseInsensitiveMap(relation.tableMeta.storage.properties)
+ val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
+ .getOrElse(SQLConf.get.sessionLocalTimeZone)
+ val partitions = if (partFilters.nonEmpty) {
+ catalog.listPartitionsByFilter(relation.tableMeta.identifier, normalizedFilters)
+ } else {
+ catalog.listPartitions(relation.tableMeta.identifier)
+ }
+
+ val partitionData = partitions.map { p =>
+ InternalRow.fromSeq(partAttrs.map { attr =>
+ Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval()
+ })
+ }
+ LocalRelation(partAttrs, partitionData)
+
+ case _ =>
+ throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
+ s"please turn off ${SQLConf.OPTIMIZER_METADATA_ONLY.key} and try again.")
+ }
+ }
+ }
+
+ /**
+ * A pattern that finds the partitioned table relation node inside the given plan, and returns a
+ * pair of the partition attributes and the table relation node.
+ */
+ object PartitionedRelation extends PredicateHelper {
+
+ def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = {
+ plan match {
+ case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
+ if fsRelation.partitionSchema.nonEmpty =>
+ val partAttrs = AttributeSet(getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l))
+ Some((partAttrs, l))
+
+ case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
+ val partAttrs = AttributeSet(
+ getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation))
+ Some((partAttrs, relation))
+
+ case _ => None
+ }
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 5e5255ec91e8c..6c6d344240cea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -30,6 +30,7 @@ class SparkOptimizer(
extends Optimizer(catalog) {
override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("Extract Python UDFs", Once,
Seq(ExtractPythonUDFFromAggregate, ExtractPythonUDFs): _*) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2a33101c23dbe..68cd3a0f3eef3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2422,6 +2422,42 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(s"$expected") :: Nil)
}
+ ignore("SPARK-15752 optimize metadata only query for datasource table") {
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+ withTable("srcpart_15752") {
+ val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "a" else "b"))
+ .toDF("col1", "col2", "partcol1", "partcol2")
+ data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart_15752")
+ checkAnswer(
+ sql("select partcol1 from srcpart_15752 group by partcol1"),
+ Row(0) :: Row(1) :: Nil)
+ checkAnswer(
+ sql("select partcol1 from srcpart_15752 where partcol1 = 1 group by partcol1"),
+ Row(1))
+ checkAnswer(
+ sql("select partcol1, count(distinct partcol2) from srcpart_15752 group by partcol1"),
+ Row(0, 1) :: Row(1, 1) :: Nil)
+ checkAnswer(
+ sql("select partcol1, count(distinct partcol2) from srcpart_15752 where partcol1 = 1 " +
+ "group by partcol1"),
+ Row(1, 1) :: Nil)
+ checkAnswer(sql("select distinct partcol1 from srcpart_15752"), Row(0) :: Row(1) :: Nil)
+ checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 1"), Row(1))
+ checkAnswer(
+ sql("select distinct col from (select partcol1 + 1 as col from srcpart_15752 " +
+ "where partcol1 = 1) t"),
+ Row(2))
+ checkAnswer(sql("select max(partcol1) from srcpart_15752"), Row(1))
+ checkAnswer(sql("select max(partcol1) from srcpart_15752 where partcol1 = 1"), Row(1))
+ checkAnswer(sql("select max(partcol1) from (select partcol1 from srcpart_15752) t"), Row(1))
+ checkAnswer(
+ sql("select max(col) from (select partcol1 + 1 as col from srcpart_15752 " +
+ "where partcol1 = 1) t"),
+ Row(2))
+ }
+ }
+ }
+
test("SPARK-16975: Column-partition path starting '_' should be handled correctly") {
withTempDir { dir =>
val dataDir = new File(dir, "data").getCanonicalPath
@@ -2930,6 +2966,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}
+
+ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+ withTable("t") {
+ sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
+ sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
+ checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
+ checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
+ }
+ }
+ }
}
case class Foo(bar: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
new file mode 100644
index 0000000000000..f37802309be97
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.io.File
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
+ import testImplicits._
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "even" else "odd"))
+ .toDF("col1", "col2", "partcol1", "partcol2")
+ data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart")
+ }
+
+ override protected def afterAll(): Unit = {
+ try {
+ sql("DROP TABLE IF EXISTS srcpart")
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
+ val localRelations = df.queryExecution.optimizedPlan.collect {
+ case l @ LocalRelation(_, _, _) => l
+ }
+ assert(localRelations.size == 1)
+ }
+
+ private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
+ val localRelations = df.queryExecution.optimizedPlan.collect {
+ case l @ LocalRelation(_, _, _) => l
+ }
+ assert(localRelations.size == 0)
+ }
+
+ private def testMetadataOnly(name: String, sqls: String*): Unit = {
+ ignore(name) {
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+ sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
+ }
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+ sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+ }
+ }
+ }
+
+ private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
+ ignore(name) {
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+ sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+ }
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+ sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+ }
+ }
+ }
+
+ testMetadataOnly(
+ "Aggregate expression is partition columns",
+ "select partcol1 from srcpart group by partcol1",
+ "select partcol2 from srcpart where partcol1 = 0 group by partcol2")
+
+ testMetadataOnly(
+ "Distinct aggregate function on partition columns",
+ "SELECT partcol1, count(distinct partcol2) FROM srcpart group by partcol1",
+ "SELECT partcol1, count(distinct partcol2) FROM srcpart where partcol1 = 0 group by partcol1")
+
+ testMetadataOnly(
+ "Distinct on partition columns",
+ "select distinct partcol1, partcol2 from srcpart",
+ "select distinct c1 from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t")
+
+ testMetadataOnly(
+ "Aggregate function on partition columns which have same result w or w/o DISTINCT keyword",
+ "select max(partcol1) from srcpart",
+ "select min(partcol1) from srcpart where partcol1 = 0",
+ "select first(partcol1) from srcpart",
+ "select last(partcol1) from srcpart where partcol1 = 0",
+ "select partcol2, min(partcol1) from srcpart where partcol1 = 0 group by partcol2",
+ "select max(c1) from (select partcol1 + 1 as c1 from srcpart where partcol1 = 0) t")
+
+ testNotMetadataOnly(
+ "Don't optimize metadata only query for non-partition columns",
+ "select col1 from srcpart group by col1",
+ "select partcol1, max(col1) from srcpart group by partcol1",
+ "select partcol1, count(distinct col1) from srcpart group by partcol1",
+ "select distinct partcol1, col1 from srcpart")
+
+ testNotMetadataOnly(
+ "Don't optimize metadata only query for non-distinct aggregate function on partition columns",
+ "select partcol1, sum(partcol2) from srcpart group by partcol1",
+ "select partcol1, count(partcol2) from srcpart group by partcol1")
+
+ testNotMetadataOnly(
+ "Don't optimize metadata only query for GroupingSet/Union operator",
+ "select partcol1, max(partcol2) from srcpart where partcol1 = 0 group by rollup (partcol1)",
+ "select partcol2 from (select partcol2 from srcpart where partcol1 = 0 union all " +
+ "select partcol2 from srcpart where partcol1 = 1) t group by partcol2")
+
+ test("SPARK-21884 Fix StackOverflowError on MetadataOnlyQuery") {
+ withTable("t_1000") {
+ sql("CREATE TABLE t_1000 (a INT, p INT) USING PARQUET PARTITIONED BY (p)")
+ (1 to 1000).foreach(p => sql(s"ALTER TABLE t_1000 ADD PARTITION (p=$p)"))
+ sql("SELECT COUNT(DISTINCT p) FROM t_1000").collect()
+ }
+ }
+
+ test("Incorrect result caused by the rule OptimizeMetadataOnlyQuery") {
+ withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
+ withTempPath { path =>
+ val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
+ Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
+ .write.json(tablePath.getCanonicalPath)
+
+ val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
+ checkAnswer(df, Row("a", "e", "c"))
+
+ val localRelation = df.queryExecution.optimizedPlan.collectFirst {
+ case l: LocalRelation => l
+ }
+ assert(localRelation.nonEmpty, "expect to see a LocalRelation")
+ assert(localRelation.get.output.map(_.name) == Seq("cOl3", "cOl1", "cOl5"))
+ }
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala
new file mode 100644
index 0000000000000..1e525c46a9cfb
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Filter, Project, SubqueryAlias}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleton
+ with BeforeAndAfter with SQLTestUtils {
+
+ import spark.implicits._
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ sql("CREATE TABLE metadata_only (id bigint, data string) PARTITIONED BY (part int)")
+ (0 to 10).foreach(p => sql(s"ALTER TABLE metadata_only ADD PARTITION (part=$p)"))
+ }
+
+ override protected def afterAll(): Unit = {
+ try {
+ sql("DROP TABLE IF EXISTS metadata_only")
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ test("SPARK-23877: validate metadata-only query pushes filters to metastore") {
+ withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
+ val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
+
+ // verify the number of matching partitions
+ assert(sql("SELECT DISTINCT part FROM metadata_only WHERE part < 5").collect().length === 5)
+
+ // verify that the partition predicate was pushed down to the metastore
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount === 5)
+ }
+ }
+
+ test("SPARK-23877: filter on projected expression") {
+ withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
+ val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
+
+ // verify the matching partitions
+ val partitions = spark.internalCreateDataFrame(Distinct(Filter(($"x" < 5).expr,
+ Project(Seq(($"part" + 1).as("x").expr.asInstanceOf[NamedExpression]),
+ spark.table("metadata_only").logicalPlan.asInstanceOf[SubqueryAlias].child)))
+ .queryExecution.toRdd, StructType(Seq(StructField("x", IntegerType))))
+
+ checkAnswer(partitions, Seq(1, 2, 3, 4).toDF("x"))
+
+ // verify that the partition predicate was not pushed down to the metastore
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount == 11)
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 01c40d1541734..df4d536a9c9c2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -86,6 +86,17 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assert(message.contains("Table or view not found"))
}
+ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+ withTable("t") {
+ sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
+ sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
+ checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
+ checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
+ }
+ }
+ }
+
test("script") {
assume(TestUtils.testCommandAvailable("/bin/bash"))
assume(TestUtils.testCommandAvailable("echo | sed"))
@@ -1770,6 +1781,95 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
+ ignore("SPARK-15752 optimize metadata only query for hive table") {
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+ withTable("data_15752", "srcpart_15752", "srctext_15752") {
+ val df = Seq((1, "2"), (3, "4")).toDF("key", "value")
+ df.createOrReplaceTempView("data_15752")
+ sql(
+ """
+ |CREATE TABLE srcpart_15752 (col1 INT, col2 STRING)
+ |PARTITIONED BY (partcol1 INT, partcol2 STRING) STORED AS parquet
+ """.stripMargin)
+ for (partcol1 <- Seq(0, 1); partcol2 <- Seq("a", "b")) {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE srcpart_15752
+ |PARTITION (partcol1='$partcol1', partcol2='$partcol2')
+ |select key, value from data_15752
+ """.stripMargin)
+ }
+ checkAnswer(
+ sql("select partcol1 from srcpart_15752 group by partcol1"),
+ Row(0) :: Row(1) :: Nil)
+ checkAnswer(
+ sql("select partcol1 from srcpart_15752 where partcol1 = 1 group by partcol1"),
+ Row(1))
+ checkAnswer(
+ sql("select partcol1, count(distinct partcol2) from srcpart_15752 group by partcol1"),
+ Row(0, 2) :: Row(1, 2) :: Nil)
+ checkAnswer(
+ sql("select partcol1, count(distinct partcol2) from srcpart_15752 where partcol1 = 1 " +
+ "group by partcol1"),
+ Row(1, 2) :: Nil)
+ checkAnswer(sql("select distinct partcol1 from srcpart_15752"), Row(0) :: Row(1) :: Nil)
+ checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 1"), Row(1))
+ checkAnswer(
+ sql("select distinct col from (select partcol1 + 1 as col from srcpart_15752 " +
+ "where partcol1 = 1) t"),
+ Row(2))
+ checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 1"), Row(1))
+ checkAnswer(sql("select max(partcol1) from srcpart_15752"), Row(1))
+ checkAnswer(sql("select max(partcol1) from srcpart_15752 where partcol1 = 1"), Row(1))
+ checkAnswer(sql("select max(partcol1) from (select partcol1 from srcpart_15752) t"), Row(1))
+ checkAnswer(
+ sql("select max(col) from (select partcol1 + 1 as col from srcpart_15752 " +
+ "where partcol1 = 1) t"),
+ Row(2))
+
+ sql(
+ """
+ |CREATE TABLE srctext_15752 (col1 INT, col2 STRING)
+ |PARTITIONED BY (partcol1 INT, partcol2 STRING) STORED AS textfile
+ """.stripMargin)
+ for (partcol1 <- Seq(0, 1); partcol2 <- Seq("a", "b")) {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE srctext_15752
+ |PARTITION (partcol1='$partcol1', partcol2='$partcol2')
+ |select key, value from data_15752
+ """.stripMargin)
+ }
+ checkAnswer(
+ sql("select partcol1 from srctext_15752 group by partcol1"),
+ Row(0) :: Row(1) :: Nil)
+ checkAnswer(
+ sql("select partcol1 from srctext_15752 where partcol1 = 1 group by partcol1"),
+ Row(1))
+ checkAnswer(
+ sql("select partcol1, count(distinct partcol2) from srctext_15752 group by partcol1"),
+ Row(0, 2) :: Row(1, 2) :: Nil)
+ checkAnswer(
+ sql("select partcol1, count(distinct partcol2) from srctext_15752 where partcol1 = 1 " +
+ "group by partcol1"),
+ Row(1, 2) :: Nil)
+ checkAnswer(sql("select distinct partcol1 from srctext_15752"), Row(0) :: Row(1) :: Nil)
+ checkAnswer(sql("select distinct partcol1 from srctext_15752 where partcol1 = 1"), Row(1))
+ checkAnswer(
+ sql("select distinct col from (select partcol1 + 1 as col from srctext_15752 " +
+ "where partcol1 = 1) t"),
+ Row(2))
+ checkAnswer(sql("select max(partcol1) from srctext_15752"), Row(1))
+ checkAnswer(sql("select max(partcol1) from srctext_15752 where partcol1 = 1"), Row(1))
+ checkAnswer(sql("select max(partcol1) from (select partcol1 from srctext_15752) t"), Row(1))
+ checkAnswer(
+ sql("select max(col) from (select partcol1 + 1 as col from srctext_15752 " +
+ "where partcol1 = 1) t"),
+ Row(2))
+ }
+ }
+ }
+
test("SPARK-17354: Partitioning by dates/timestamps works with Parquet vectorized reader") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
sql(
From 8399d773ddcab959daa0d75b8795757f9ca69a88 Mon Sep 17 00:00:00 2001
From: Gengliang Wang
Date: Fri, 25 Jan 2019 02:14:11 +0800
Subject: [PATCH 4/7] address comments
---
.../apache/spark/sql/internal/SQLConf.scala | 1 +
.../execution/OptimizeMetadataOnlyQuery.scala | 4 ++
.../org/apache/spark/sql/SQLQuerySuite.scala | 40 +++++++++++++++----
.../OptimizeMetadataOnlyQuerySuite.scala | 4 +-
.../sql/hive/execution/SQLQuerySuite.scala | 31 ++++++++------
5 files changed, 59 insertions(+), 21 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f6113447a1484..4e3dfdc714c80 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -582,6 +582,7 @@ object SQLConf {
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
+ .internal()
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
"scanned are partition columns and the query has an aggregate operator that satisfies " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 3ca03ab2939aa..9bda36124966c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -72,6 +72,10 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
})
}
if (isAllDistinctAgg) {
+ logWarning("Since configuration `spark.sql.optimizer.metadataOnly` is enabled, " +
+ "Spark will scan partition-level metadata without scanning data files. " +
+ "This could result in wrong results when with empty partition data."
+ )
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
} else {
a
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 68cd3a0f3eef3..b8c4d73f1b2b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2422,7 +2422,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(s"$expected") :: Nil)
}
- ignore("SPARK-15752 optimize metadata only query for datasource table") {
+ test("SPARK-15752 optimize metadata only query for datasource table") {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
withTable("srcpart_15752") {
val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "a" else "b"))
@@ -2968,12 +2968,38 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
- withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
- withTable("t") {
- sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
- sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
- checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
- checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
+ Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
+ withTable("t") {
+ sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
+ sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
+ if (enableOptimizeMetadataOnlyQuery) {
+ // The result is wrong if we enable the configuration.
+ checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
+ } else {
+ checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
+ }
+ checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
+ }
+
+ withTempPath { path =>
+ val tabLocation = path.getCanonicalPath
+ val partLocation1 = tabLocation + "/p=3"
+ val partLocation2 = tabLocation + "/p=1"
+ // SPARK-23271 empty RDD when saved should write a metadata only file
+ val df = spark.emptyDataFrame.select(lit(1).as("col"))
+ df.write.parquet(partLocation1)
+ val df2 = spark.range(10).toDF("col")
+ df2.write.parquet(partLocation2)
+ val readDF = spark.read.parquet(tabLocation)
+ if (enableOptimizeMetadataOnlyQuery) {
+ // The result is wrong if we enable the configuration.
+ checkAnswer(readDF.selectExpr("max(p)"), Row(3))
+ } else {
+ checkAnswer(readDF.selectExpr("max(p)"), Row(1))
+ }
+ checkAnswer(readDF.selectExpr("max(col)"), Row(9))
+ }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
index f37802309be97..a543eb8351656 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
@@ -58,7 +58,7 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
}
private def testMetadataOnly(name: String, sqls: String*): Unit = {
- ignore(name) {
+ test(name) {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
}
@@ -69,7 +69,7 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
}
private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
- ignore(name) {
+ test(name) {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index df4d536a9c9c2..d506edc0cf088 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -86,17 +86,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assert(message.contains("Table or view not found"))
}
- test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
- withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
- withTable("t") {
- sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
- sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
- checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
- checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
- }
- }
- }
-
test("script") {
assume(TestUtils.testCommandAvailable("/bin/bash"))
assume(TestUtils.testCommandAvailable("echo | sed"))
@@ -1781,7 +1770,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- ignore("SPARK-15752 optimize metadata only query for hive table") {
+ test("SPARK-15752 optimize metadata only query for hive table") {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
withTable("data_15752", "srcpart_15752", "srctext_15752") {
val df = Seq((1, "2"), (3, "4")).toDF("key", "value")
@@ -2341,4 +2330,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
+ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
+ Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
+ withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
+ withTable("t") {
+ sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
+ sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
+ if (enableOptimizeMetadataOnlyQuery) {
+ // The result is wrong if we enable the configuration.
+ checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
+ } else {
+ checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
+ }
+ checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
+ }
+ }
+ }
+ }
+
}
From 1524205866e745de21d9e574f9b8f3883ee51c4a Mon Sep 17 00:00:00 2001
From: Xiao Li
Date: Thu, 24 Jan 2019 15:18:47 -0800
Subject: [PATCH 5/7] Update SQLConf.scala
---
.../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4e3dfdc714c80..0f013cea87b14 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -587,7 +587,7 @@ object SQLConf {
"to produce the partition columns instead of table scans. It applies when all the columns " +
"scanned are partition columns and the query has an aggregate operator that satisfies " +
"distinct semantics. By default the optimization is disabled, since it may return " +
- "incorrect results with empty tables.")
+ "incorrect results when the files are empty..")
.booleanConf
.createWithDefault(false)
From f7dac39da786e6a787426233db6c5b56897efd07 Mon Sep 17 00:00:00 2001
From: Xiao Li
Date: Thu, 24 Jan 2019 15:19:14 -0800
Subject: [PATCH 6/7] Update SQLConf.scala
---
.../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0f013cea87b14..da595e7a352db 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -587,7 +587,7 @@ object SQLConf {
"to produce the partition columns instead of table scans. It applies when all the columns " +
"scanned are partition columns and the query has an aggregate operator that satisfies " +
"distinct semantics. By default the optimization is disabled, since it may return " +
- "incorrect results when the files are empty..")
+ "incorrect results when the files are empty.")
.booleanConf
.createWithDefault(false)
From 9e99d4bfc1ec15d74b29bb13f9260faf53b38451 Mon Sep 17 00:00:00 2001
From: Xiao Li
Date: Thu, 24 Jan 2019 15:20:10 -0800
Subject: [PATCH 7/7] Update OptimizeMetadataOnlyQuery.scala
---
.../apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 9bda36124966c..45e5f415e8da1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -74,7 +74,8 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
if (isAllDistinctAgg) {
logWarning("Since configuration `spark.sql.optimizer.metadataOnly` is enabled, " +
"Spark will scan partition-level metadata without scanning data files. " +
- "This could result in wrong results when with empty partition data."
+ "This could result in wrong results when the partition metadata exists but the " +
+ "inclusive data files are empty."
)
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
} else {