Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -990,18 +990,6 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
</p>
</td>
</tr>
<tr>
<td><code>spark.sql.optimizer.metadataOnly</code></td>
<td>true</td>
<td>
<p>
When true, enable the metadata-only query optimization that use the table's metadata to
produce the partition columns instead of table scans. It applies when all the columns scanned
are partition columns and the query has an aggregate operator that satisfies distinct
semantics.
</p>
</td>
</tr>
</table>

## ORC Files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,12 +469,14 @@ object SQLConf {
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)

val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.internal()
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
"scanned are partition columns and the query has an aggregate operator that satisfies " +
"distinct semantics.")
"distinct semantics. By default the optimization is disabled, since it may return " +
"incorrect results when the files are empty.")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)

val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
.doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
})
}
if (isAllDistinctAgg) {
logWarning("Since configuration `spark.sql.optimizer.metadataOnly` is enabled, " +
"Spark will scan partition-level metadata without scanning data files. " +
"This could result in wrong results when the partition metadata exists but the " +
"inclusive data files are empty."
)
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
} else {
a
Expand Down
25 changes: 25 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2869,6 +2869,31 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
withTempPath { path =>
val tabLocation = path.getCanonicalPath
val partLocation1 = tabLocation + "/p=3"
val partLocation2 = tabLocation + "/p=1"
// SPARK-23271 empty RDD when saved should write a metadata only file
val df = spark.range(10).filter($"id" < 0).toDF("col")
df.write.parquet(partLocation1)
val df2 = spark.range(10).toDF("col")
df2.write.parquet(partLocation2)
val readDF = spark.read.parquet(tabLocation)
if (enableOptimizeMetadataOnlyQuery) {
// The result is wrong if we enable the configuration.
checkAnswer(readDF.selectExpr("max(p)"), Row(3))
} else {
checkAnswer(readDF.selectExpr("max(p)"), Row(1))
}
checkAnswer(readDF.selectExpr("max(col)"), Row(9))
}
}
}
}
}

case class Foo(bar: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -2185,4 +2185,21 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
withTable("t") {
sql("CREATE TABLE t (col1 INT) PARTITIONED BY (p1 INT)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
if (enableOptimizeMetadataOnlyQuery) {
// The result is wrong if we enable the configuration.
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
} else {
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
}
checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
}
}
}
}
}