Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,14 @@ object SQLConf {
.doubleConf
.createWithDefault(0.05)

val AUTO_UPDATE_SIZE =
buildConf("spark.sql.statistics.autoUpdate.size")
.doc("Enables automatic update for table size once table's data is changed. Note that if " +
"the total number of files of the table is very large, this can be expensive and slow " +
"down data change commands.")
.booleanConf
.createWithDefault(false)

val CBO_ENABLED =
buildConf("spark.sql.cbo.enabled")
.doc("Enables CBO for estimation of plan statistics when set true.")
Expand Down Expand Up @@ -1083,6 +1091,8 @@ class SQLConf extends Serializable with Logging {

def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)

def autoUpdateSize: Boolean = getConf(SQLConf.AUTO_UPDATE_SIZE)

def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)

def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,14 @@ object CommandUtils extends Logging {
def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
if (table.stats.nonEmpty) {
val catalog = sparkSession.sessionState.catalog
catalog.alterTableStats(table.identifier, None)
if (sparkSession.sessionState.conf.autoUpdateSize) {
val newTable = catalog.getTableMetadata(table.identifier)
val newSize = CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable)
val newStats = CatalogStatistics(sizeInBytes = newSize)
catalog.alterTableStats(table.identifier, Some(newStats))
} else {
catalog.alterTableStats(table.identifier, None)
}
}
}

Expand Down Expand Up @@ -84,7 +91,9 @@ object CommandUtils extends Logging {
size
}

locationUri.map { p =>
val startTime = System.nanoTime()
logInfo(s"Starting to calculate the total file size under path $locationUri.")
val size = locationUri.map { p =>
val path = new Path(p)
try {
val fs = path.getFileSystem(sessionState.newHadoopConf())
Expand All @@ -97,6 +106,10 @@ object CommandUtils extends Logging {
0L
}
}.getOrElse(0L)
val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
logInfo(s"It took $durationInMs ms to calculate the total file size under path $locationUri.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the log message contains the timestamp. It does not need to calculate the total time, but I think it is fine here.


size
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,20 @@ case class AlterTableAddPartitionCommand(
}
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)

CommandUtils.updateTableStats(sparkSession, table)
if (table.stats.nonEmpty) {
if (sparkSession.sessionState.conf.autoUpdateSize) {
val addedSize = parts.map { part =>
CommandUtils.calculateLocationSize(sparkSession.sessionState, table.identifier,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the function calculateLocationSize, please add log messages when starting/finishing the statistics collection and

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, shall we log in info level or debug level?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INFO should be fine, because it should not be a lot, right?

part.storage.locationUri)
}.sum
if (addedSize > 0) {
val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize)
catalog.alterTableStats(table.identifier, Some(newStats))
}
} else {
catalog.alterTableStats(table.identifier, None)
}
}
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.test.SQLTestData.ArrayData
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -178,36 +178,63 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared

test("change stats after set location command") {
val table = "change_stats_set_location_table"
withTable(table) {
spark.range(100).select($"id", $"id" % 5 as "value").write.saveAsTable(table)
// analyze to get initial stats
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value")
val fetched1 = checkTableStats(
table, hasSizeInBytes = true, expectedRowCounts = Some(100))
assert(fetched1.get.sizeInBytes > 0)
assert(fetched1.get.colStats.size == 2)

// set location command
withTempDir { newLocation =>
sql(s"ALTER TABLE $table SET LOCATION '${newLocation.toURI.toString}'")
checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
withTable(table) {
spark.range(100).select($"id", $"id" % 5 as "value").write.saveAsTable(table)
// analyze to get initial stats
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value")
val fetched1 = checkTableStats(
table, hasSizeInBytes = true, expectedRowCounts = Some(100))
assert(fetched1.get.sizeInBytes > 0)
assert(fetched1.get.colStats.size == 2)

// set location command
val initLocation = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table))
.storage.locationUri.get.toString
withTempDir { newLocation =>
sql(s"ALTER TABLE $table SET LOCATION '${newLocation.toURI.toString}'")
if (autoUpdate) {
val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetched2.get.sizeInBytes == 0)
assert(fetched2.get.colStats.isEmpty)

// set back to the initial location
sql(s"ALTER TABLE $table SET LOCATION '$initLocation'")
val fetched3 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetched3.get.sizeInBytes == fetched1.get.sizeInBytes)
} else {
checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
}
}
}
}
}
}

test("change stats after insert command for datasource table") {
val table = "change_stats_insert_datasource_table"
withTable(table) {
sql(s"CREATE TABLE $table (i int, j string) USING PARQUET")
// analyze to get initial stats
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
assert(fetched1.get.sizeInBytes == 0)
assert(fetched1.get.colStats.size == 2)

// insert into command
sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
withTable(table) {
sql(s"CREATE TABLE $table (i int, j string) USING PARQUET")
// analyze to get initial stats
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
assert(fetched1.get.sizeInBytes == 0)
assert(fetched1.get.colStats.size == 2)

// insert into command
sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
if (autoUpdate) {
val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetched2.get.sizeInBytes > 0)
assert(fetched2.get.colStats.isEmpty)
} else {
checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
}
}
}
}
}

Expand Down
187 changes: 116 additions & 71 deletions sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -444,88 +444,133 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto

test("change stats after insert command for hive table") {
val table = s"change_stats_insert_hive_table"
withTable(table) {
sql(s"CREATE TABLE $table (i int, j string)")
// analyze to get initial stats
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
assert(fetched1.get.sizeInBytes == 0)
assert(fetched1.get.colStats.size == 2)

// insert into command
sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
assert(getStatsProperties(table).isEmpty)
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
withTable(table) {
sql(s"CREATE TABLE $table (i int, j string)")
// analyze to get initial stats
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
assert(fetched1.get.sizeInBytes == 0)
assert(fetched1.get.colStats.size == 2)

// insert into command
sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
if (autoUpdate) {
val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetched2.get.sizeInBytes > 0)
assert(fetched2.get.colStats.isEmpty)
val statsProp = getStatsProperties(table)
assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes)
} else {
assert(getStatsProperties(table).isEmpty)
}
}
}
}
}

test("change stats after load data command") {
val table = "change_stats_load_table"
withTable(table) {
sql(s"CREATE TABLE $table (i INT, j STRING) STORED AS PARQUET")
// analyze to get initial stats
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
assert(fetched1.get.sizeInBytes == 0)
assert(fetched1.get.colStats.size == 2)

withTempDir { loadPath =>
// load data command
val file = new File(loadPath + "/data")
val writer = new PrintWriter(file)
writer.write("2,xyz")
writer.close()
sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table")
assert(getStatsProperties(table).isEmpty)
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
withTable(table) {
sql(s"CREATE TABLE $table (i INT, j STRING) STORED AS PARQUET")
// analyze to get initial stats
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
assert(fetched1.get.sizeInBytes == 0)
assert(fetched1.get.colStats.size == 2)

withTempDir { loadPath =>
// load data command
val file = new File(loadPath + "/data")
val writer = new PrintWriter(file)
writer.write("2,xyz")
writer.close()
sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table")
if (autoUpdate) {
val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetched2.get.sizeInBytes > 0)
assert(fetched2.get.colStats.isEmpty)
val statsProp = getStatsProperties(table)
assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes)
} else {
assert(getStatsProperties(table).isEmpty)
}
}
}
}
}
}

test("change stats after add/drop partition command") {
val table = "change_stats_part_table"
withTable(table) {
sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, hr STRING)")
// table has two partitions initially
for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) {
sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') SELECT 1, 'a'")
}
// analyze to get initial stats
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2))
assert(fetched1.get.sizeInBytes > 0)
assert(fetched1.get.colStats.size == 2)

withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
val file1 = new File(dir1 + "/data")
val writer1 = new PrintWriter(file1)
writer1.write("1,a")
writer1.close()

val file2 = new File(dir2 + "/data")
val writer2 = new PrintWriter(file2)
writer2.write("1,a")
writer2.close()

// add partition command
sql(
s"""
|ALTER TABLE $table ADD
|PARTITION (ds='2008-04-09', hr='11') LOCATION '${dir1.toURI.toString}'
|PARTITION (ds='2008-04-09', hr='12') LOCATION '${dir2.toURI.toString}'
""".stripMargin)
assert(getStatsProperties(table).isEmpty)

// generate stats again
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(4))
assert(fetched2.get.sizeInBytes > 0)
assert(fetched2.get.colStats.size == 2)

// drop partition command
sql(s"ALTER TABLE $table DROP PARTITION (ds='2008-04-08'), PARTITION (hr='12')")
// only one partition left
assert(spark.sessionState.catalog.listPartitions(TableIdentifier(table))
.map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11")))
assert(getStatsProperties(table).isEmpty)
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
withTable(table) {
sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, hr STRING)")
// table has two partitions initially
for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) {
sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') SELECT 1, 'a'")
}
// analyze to get initial stats
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2))
assert(fetched1.get.sizeInBytes > 0)
assert(fetched1.get.colStats.size == 2)

withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
val file1 = new File(dir1 + "/data")
val writer1 = new PrintWriter(file1)
writer1.write("1,a")
writer1.close()

val file2 = new File(dir2 + "/data")
val writer2 = new PrintWriter(file2)
writer2.write("1,a")
writer2.close()

// add partition command
sql(
s"""
|ALTER TABLE $table ADD
|PARTITION (ds='2008-04-09', hr='11') LOCATION '${dir1.toURI.toString}'
|PARTITION (ds='2008-04-09', hr='12') LOCATION '${dir2.toURI.toString}'
""".stripMargin)
if (autoUpdate) {
val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetched2.get.sizeInBytes > fetched1.get.sizeInBytes)
assert(fetched2.get.colStats.isEmpty)
val statsProp = getStatsProperties(table)
assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes)
} else {
assert(getStatsProperties(table).isEmpty)
}

// now the table has four partitions, generate stats again
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
val fetched3 = checkTableStats(
table, hasSizeInBytes = true, expectedRowCounts = Some(4))
assert(fetched3.get.sizeInBytes > 0)
assert(fetched3.get.colStats.size == 2)

// drop partition command
sql(s"ALTER TABLE $table DROP PARTITION (ds='2008-04-08'), PARTITION (hr='12')")
assert(spark.sessionState.catalog.listPartitions(TableIdentifier(table))
.map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11")))
// only one partition left
if (autoUpdate) {
val fetched4 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetched4.get.sizeInBytes < fetched1.get.sizeInBytes)
assert(fetched4.get.colStats.isEmpty)
val statsProp = getStatsProperties(table)
assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched4.get.sizeInBytes)
} else {
assert(getStatsProperties(table).isEmpty)
}
}
}
}
}
}
Expand Down