Skip to content

Commit b143e84

Browse files
committed
[SPARK-274034][SQL] Table Statisics shall be updated automatically if auto update feature is enabled(spark.sql.statistics.size.autoUpdate.enabled =true)
What changes were proposed in this pull request? For the table, INSERT OVERWRITE command statistics are automatically computed by default if user set spark.sql.statistics.size.autoUpdate.enabled =true and the statistics shall be recorded in metadata store, this is not happening currently because of validation table.stats.nonEmpty, the statistics were never recorded for the newly created table, this check doesn't holds good if auto update property feature is enabled by the user. As part of fix the autoSizeUpdateEnabled has been pulled up as part of separate validation which will ensure if this feature is enabled the system will calculate the size of the table in every insert command and the same will be recorded in meta-store. How was this patch tested? UT is written and manually verified in cluster. Tested with unit tests + some internal tests on real cluster.
1 parent 8bc304f commit b143e84

File tree

2 files changed

+24
-10
lines changed

2 files changed

+24
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,14 @@ object CommandUtils extends Logging {
4242

4343
/** Change statistics after changing data by commands. */
4444
def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
45-
if (table.stats.nonEmpty) {
46-
val catalog = sparkSession.sessionState.catalog
47-
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
48-
val newTable = catalog.getTableMetadata(table.identifier)
49-
val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
50-
val newStats = CatalogStatistics(sizeInBytes = newSize)
51-
catalog.alterTableStats(table.identifier, Some(newStats))
52-
} else {
53-
catalog.alterTableStats(table.identifier, None)
54-
}
45+
val catalog = sparkSession.sessionState.catalog
46+
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
47+
val newTable = catalog.getTableMetadata(table.identifier)
48+
val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
49+
val newStats = CatalogStatistics(sizeInBytes = newSize)
50+
catalog.alterTableStats(table.identifier, Some(newStats))
51+
} else if (table.stats.nonEmpty) {
52+
catalog.alterTableStats(table.identifier, None)
5553
}
5654
}
5755

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,22 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
337337
}
338338
}
339339

340+
test("auto gather stats after insert command") {
341+
val table = "change_stats_insert_datasource_table"
342+
val autoUpdate = true
343+
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) {
344+
withTable(table) {
345+
sql(s"CREATE TABLE $table (i int, j string) STORED AS PARQUET")
346+
// analyze to get initial stats
347+
// insert into command
348+
sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
349+
val stats = getCatalogTable(table).stats
350+
assert(stats.isDefined)
351+
assert(stats.get.sizeInBytes >= 0)
352+
}
353+
}
354+
}
355+
340356
test("invalidation of tableRelationCache after inserts") {
341357
val table = "invalidate_catalog_cache_table"
342358
Seq(false, true).foreach { autoUpdate =>

0 commit comments

Comments
 (0)