From d782e4ace443558ec52421a1959c85aa68dfb998 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 24 Jan 2019 18:24:49 -0800 Subject: [PATCH 1/4] [SPARK-26709][SQL] OptimizeMetadataOnlyQuery does not handle empty records correctly When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results: ``` sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") sql("SELECT MAX(p1) FROM t") ``` The result is supposed to be `null`. However, with the optimization the result is `5`. The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in #13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem. It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default. Unit test Closes #23635 from gengliangwang/optimizeMetadata. Lead-authored-by: Gengliang Wang Co-authored-by: Xiao Li Signed-off-by: gatorsmile --- docs/sql-programming-guide.md | 12 ------ .../apache/spark/sql/internal/SQLConf.scala | 6 ++- .../execution/OptimizeMetadataOnlyQuery.scala | 5 +++ .../org/apache/spark/sql/SQLQuerySuite.scala | 37 +++++++++++++++++++ .../sql/hive/execution/SQLQuerySuite.scala | 17 +++++++++ 5 files changed, 63 insertions(+), 14 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index e5fa4c686ea8f..038c1ecb2a65f 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -990,18 +990,6 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession

- - spark.sql.optimizer.metadataOnly - true - -

- When true, enable the metadata-only query optimization that use the table's metadata to - produce the partition columns instead of table scans. It applies when all the columns scanned - are partition columns and the query has an aggregate operator that satisfies distinct - semantics. -

- - ## ORC Files diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 731d4e30483b5..c77c4f2f13dc7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -469,12 +469,14 @@ object SQLConf { .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString) val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly") + .internal() .doc("When true, enable the metadata-only query optimization that use the table's metadata " + "to produce the partition columns instead of table scans. It applies when all the columns " + "scanned are partition columns and the query has an aggregate operator that satisfies " + - "distinct semantics.") + "distinct semantics. By default the optimization is disabled, since it may return " + + "incorrect results when the files are empty.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord") .doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index dc4aff9f12580..fff32c8e1dd6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -67,6 +67,11 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic }) } if (isAllDistinctAgg) { + logWarning("Since configuration `spark.sql.optimizer.metadataOnly` is enabled, " + + "Spark will scan partition-level metadata without scanning data files. " + + "This could result in wrong results when the partition metadata exists but the " + + "inclusive data files are empty." + ) a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) } else { a diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6848b66514ef8..fba496e726ad3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2869,6 +2869,43 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { + withTable("t") { + sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") + sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") + if (enableOptimizeMetadataOnlyQuery) { + // The result is wrong if we enable the configuration. + checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5)) + } else { + checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null)) + } + checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null)) + } + + withTempPath { path => + val tabLocation = path.getCanonicalPath + val partLocation1 = tabLocation + "/p=3" + val partLocation2 = tabLocation + "/p=1" + // SPARK-23271 empty RDD when saved should write a metadata only file + val df = spark.emptyDataFrame.select(lit(1).as("col")) + df.write.parquet(partLocation1) + val df2 = spark.range(10).toDF("col") + df2.write.parquet(partLocation2) + val readDF = spark.read.parquet(tabLocation) + if (enableOptimizeMetadataOnlyQuery) { + // The result is wrong if we enable the configuration. + checkAnswer(readDF.selectExpr("max(p)"), Row(3)) + } else { + checkAnswer(readDF.selectExpr("max(p)"), Row(1)) + } + checkAnswer(readDF.selectExpr("max(col)"), Row(9)) + } + } + } + } } case class Foo(bar: Option[String]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 081d854d771a7..47016772efd04 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2185,4 +2185,21 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { + withTable("t") { + sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") + sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") + if (enableOptimizeMetadataOnlyQuery) { + // The result is wrong if we enable the configuration. + checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5)) + } else { + checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null)) + } + checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null)) + } + } + } + } } From deb84ea88a3a94bd86ca25dcfa5123b878c4286c Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 25 Jan 2019 19:39:16 +0800 Subject: [PATCH 2/4] update test case --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 47016772efd04..d11270e90c90b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2189,7 +2189,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { withTable("t") { - sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") + sql("CREATE TABLE t (col1 INT) PARTITIONED BY (p1 INT)") sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") if (enableOptimizeMetadataOnlyQuery) { // The result is wrong if we enable the configuration. From bf280cfd1e628df41e393a889d3cdf0406c81835 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sat, 26 Jan 2019 00:32:58 +0800 Subject: [PATCH 3/4] remove test case in SQLQuerySuite --- .../org/apache/spark/sql/SQLQuerySuite.scala | 37 ------------------- 1 file changed, 37 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index fba496e726ad3..6848b66514ef8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2869,43 +2869,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } - - test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { - Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => - withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { - withTable("t") { - sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") - sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") - if (enableOptimizeMetadataOnlyQuery) { - // The result is wrong if we enable the configuration. - checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5)) - } else { - checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null)) - } - checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null)) - } - - withTempPath { path => - val tabLocation = path.getCanonicalPath - val partLocation1 = tabLocation + "/p=3" - val partLocation2 = tabLocation + "/p=1" - // SPARK-23271 empty RDD when saved should write a metadata only file - val df = spark.emptyDataFrame.select(lit(1).as("col")) - df.write.parquet(partLocation1) - val df2 = spark.range(10).toDF("col") - df2.write.parquet(partLocation2) - val readDF = spark.read.parquet(tabLocation) - if (enableOptimizeMetadataOnlyQuery) { - // The result is wrong if we enable the configuration. - checkAnswer(readDF.selectExpr("max(p)"), Row(3)) - } else { - checkAnswer(readDF.selectExpr("max(p)"), Row(1)) - } - checkAnswer(readDF.selectExpr("max(col)"), Row(9)) - } - } - } - } } case class Foo(bar: Option[String]) From 096bb6c850072e6db29f3f21cd37cc4276c06bf3 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sat, 26 Jan 2019 00:54:46 +0800 Subject: [PATCH 4/4] add back test case --- .../org/apache/spark/sql/SQLQuerySuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6848b66514ef8..7c4703f19649d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2869,6 +2869,31 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { + withTempPath { path => + val tabLocation = path.getCanonicalPath + val partLocation1 = tabLocation + "/p=3" + val partLocation2 = tabLocation + "/p=1" + // SPARK-23271 empty RDD when saved should write a metadata only file + val df = spark.range(10).filter($"id" < 0).toDF("col") + df.write.parquet(partLocation1) + val df2 = spark.range(10).toDF("col") + df2.write.parquet(partLocation2) + val readDF = spark.read.parquet(tabLocation) + if (enableOptimizeMetadataOnlyQuery) { + // The result is wrong if we enable the configuration. + checkAnswer(readDF.selectExpr("max(p)"), Row(3)) + } else { + checkAnswer(readDF.selectExpr("max(p)"), Row(1)) + } + checkAnswer(readDF.selectExpr("max(col)"), Row(9)) + } + } + } + } } case class Foo(bar: Option[String])