Skip to content

Commit 8d957d7

Browse files
[SPARK-26709][SQL] OptimizeMetadataOnlyQuery does not handle empty records correctly
## What changes were proposed in this pull request? When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results: ``` sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") sql("SELECT MAX(p1) FROM t") ``` The result is supposed to be `null`. However, with the optimization the result is `5`. The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in #13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem. It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default. ## How was this patch tested? Unit test Closes #23635 from gengliangwang/optimizeMetadata. Lead-authored-by: Gengliang Wang <[email protected]> Co-authored-by: Xiao Li <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit f5b9370) Signed-off-by: gatorsmile <[email protected]>
1 parent e8e9b11 commit 8d957d7

File tree

5 files changed

+64
-14
lines changed

5 files changed

+64
-14
lines changed

docs/sql-data-sources-parquet.md

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -295,18 +295,6 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
295295
</p>
296296
</td>
297297
</tr>
298-
<tr>
299-
<td><code>spark.sql.optimizer.metadataOnly</code></td>
300-
<td>true</td>
301-
<td>
302-
<p>
303-
When true, enable the metadata-only query optimization that use the table's metadata to
304-
produce the partition columns instead of table scans. It applies when all the columns scanned
305-
are partition columns and the query has an aggregate operator that satisfies distinct
306-
semantics.
307-
</p>
308-
</td>
309-
</tr>
310298
<tr>
311299
<td><code>spark.sql.parquet.writeLegacyFormat</code></td>
312300
<td>false</td>

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,12 +562,14 @@ object SQLConf {
562562
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
563563

564564
val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
565+
.internal()
565566
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
566567
"to produce the partition columns instead of table scans. It applies when all the columns " +
567568
"scanned are partition columns and the query has an aggregate operator that satisfies " +
568-
"distinct semantics.")
569+
"distinct semantics. By default the optimization is disabled, since it may return " +
570+
"incorrect results when the files are empty.")
569571
.booleanConf
570-
.createWithDefault(true)
572+
.createWithDefault(false)
571573

572574
val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
573575
.doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +

sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
7272
})
7373
}
7474
if (isAllDistinctAgg) {
75+
logWarning("Since configuration `spark.sql.optimizer.metadataOnly` is enabled, " +
76+
"Spark will scan partition-level metadata without scanning data files. " +
77+
"This could result in wrong results when the partition metadata exists but the " +
78+
"inclusive data files are empty."
79+
)
7580
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
7681
} else {
7782
a

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2966,6 +2966,43 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
29662966
}
29672967
}
29682968
}
2969+
2970+
test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
2971+
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
2972+
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
2973+
withTable("t") {
2974+
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
2975+
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
2976+
if (enableOptimizeMetadataOnlyQuery) {
2977+
// The result is wrong if we enable the configuration.
2978+
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
2979+
} else {
2980+
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
2981+
}
2982+
checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
2983+
}
2984+
2985+
withTempPath { path =>
2986+
val tabLocation = path.getCanonicalPath
2987+
val partLocation1 = tabLocation + "/p=3"
2988+
val partLocation2 = tabLocation + "/p=1"
2989+
// SPARK-23271 empty RDD when saved should write a metadata only file
2990+
val df = spark.emptyDataFrame.select(lit(1).as("col"))
2991+
df.write.parquet(partLocation1)
2992+
val df2 = spark.range(10).toDF("col")
2993+
df2.write.parquet(partLocation2)
2994+
val readDF = spark.read.parquet(tabLocation)
2995+
if (enableOptimizeMetadataOnlyQuery) {
2996+
// The result is wrong if we enable the configuration.
2997+
checkAnswer(readDF.selectExpr("max(p)"), Row(3))
2998+
} else {
2999+
checkAnswer(readDF.selectExpr("max(p)"), Row(1))
3000+
}
3001+
checkAnswer(readDF.selectExpr("max(col)"), Row(9))
3002+
}
3003+
}
3004+
}
3005+
}
29693006
}
29703007

29713008
case class Foo(bar: Option[String])

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2290,4 +2290,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
22902290
}
22912291
}
22922292

2293+
test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
2294+
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
2295+
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
2296+
withTable("t") {
2297+
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
2298+
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
2299+
if (enableOptimizeMetadataOnlyQuery) {
2300+
// The result is wrong if we enable the configuration.
2301+
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
2302+
} else {
2303+
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
2304+
}
2305+
checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
2306+
}
2307+
}
2308+
}
2309+
}
2310+
22932311
}

0 commit comments

Comments
 (0)