Skip to content

Commit 78b9ffc

Browse files
committed
analyze all tables in a specific database
1 parent 73412ff commit 78b9ffc

File tree

7 files changed

+129
-0
lines changed

7 files changed

+129
-0
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ statement
134134
(AS? query)? #replaceTable
135135
| ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS
136136
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
137+
| ANALYZE TABLES ((FROM | IN) multipartIdentifier)? COMPUTE STATISTICS
138+
(identifier)? #analyzeTables
137139
| ALTER TABLE multipartIdentifier
138140
ADD (COLUMN | COLUMNS)
139141
columns=qualifiedColTypeWithPositionList #addTableColumns

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3547,6 +3547,25 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
35473547
}
35483548
}
35493549

3550+
/**
3551+
* Create an [[AnalyzeTables]].
3552+
* Example SQL for analyzing all tables in default database:
3553+
* {{{
3554+
* ANALYZE TABLES IN default COMPUTE STATISTICS;
3555+
* }}}
3556+
*/
3557+
override def visitAnalyzeTables(ctx: AnalyzeTablesContext): LogicalPlan = withOrigin(ctx) {
3558+
if (ctx.identifier != null &&
3559+
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
3560+
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`",
3561+
ctx.identifier())
3562+
}
3563+
val multiPart = Option(ctx.multipartIdentifier).map(visitMultipartIdentifier)
3564+
AnalyzeTables(
3565+
UnresolvedNamespace(multiPart.getOrElse(Seq.empty[String])),
3566+
noScan = ctx.identifier != null)
3567+
}
3568+
35503569
/**
35513570
* Create a [[RepairTableStatement]].
35523571
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,15 @@ case class AnalyzeTable(
602602
override def children: Seq[LogicalPlan] = child :: Nil
603603
}
604604

605+
/**
606+
* The logical plan of the ANALYZE TABLES command.
607+
*/
608+
case class AnalyzeTables(
609+
namespace: LogicalPlan,
610+
noScan: Boolean) extends Command {
611+
override def children: Seq[LogicalPlan] = Seq(namespace)
612+
}
613+
605614
/**
606615
* The logical plan of the ANALYZE TABLE FOR COLUMNS command.
607616
*/

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1898,6 +1898,15 @@ class DDLParserSuite extends AnalysisTest {
18981898
"Expected `NOSCAN` instead of `xxxx`")
18991899
}
19001900

1901+
test("analyze tables statistics") {
1902+
comparePlans(parsePlan("analyze tables in a.b.c compute statistics"),
1903+
AnalyzeTables(UnresolvedNamespace(Seq("a", "b", "c")), noScan = false))
1904+
comparePlans(parsePlan("analyze tables in a compute statistics noscan"),
1905+
AnalyzeTables(UnresolvedNamespace(Seq("a")), noScan = true))
1906+
intercept("analyze tables in a.b.c compute statistics xxxx",
1907+
"Expected `NOSCAN` instead of `xxxx`")
1908+
}
1909+
19011910
test("analyze table column statistics") {
19021911
intercept("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR COLUMNS", "")
19031912

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,13 @@ class ResolveSessionCatalog(
401401
AnalyzePartitionCommand(ident.asTableIdentifier, partitionSpec, noScan)
402402
}
403403

404+
case AnalyzeTables(SessionCatalogAndNamespace(_, ns), noScan) =>
405+
if (ns.length > 1) {
406+
throw new AnalysisException(
407+
s"The database name is not valid: ${ns.quoted}")
408+
}
409+
AnalyzeTablesCommand(ns.headOption, noScan)
410+
404411
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
405412
AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns)
406413

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.command
19+
20+
import org.apache.spark.sql.{Row, SparkSession}
21+
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
23+
24+
25+
/**
26+
* Analyzes all tables in the given database to generate statistics.
27+
*/
28+
case class AnalyzeTablesCommand(
29+
databaseName: Option[String],
30+
noScan: Boolean) extends RunnableCommand {
31+
32+
override def run(sparkSession: SparkSession): Seq[Row] = {
33+
34+
val catalog = sparkSession.sessionState.catalog
35+
val db = databaseName.getOrElse(catalog.getCurrentDatabase)
36+
catalog.listTables(db).foreach { tbl =>
37+
try {
38+
val tableMeta = catalog.getTableMetadata(tbl)
39+
if (tableMeta.tableType == CatalogTableType.MANAGED ||
40+
tableMeta.tableType == CatalogTableType.EXTERNAL) {
41+
// Compute stats for the whole table
42+
val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
43+
val tableIdentWithDB = TableIdentifier(tbl.table, Some(db))
44+
val newRowCount =
45+
if (noScan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
46+
47+
// Update the metastore if the above statistics of the table are different from those
48+
// recorded in the metastore.
49+
val newStats =
50+
CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
51+
if (newStats.isDefined) {
52+
catalog.alterTableStats(tableIdentWithDB, newStats)
53+
}
54+
}
55+
} catch {
56+
case e: Exception =>
57+
logError(s"Failed to analyze table: ${tbl.identifier}.", e)
58+
}
59+
}
60+
61+
Seq.empty[Row]
62+
}
63+
}

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,4 +671,24 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
671671
}
672672
}
673673
}
674+
675+
test("analyze all tables in a specific database") {
676+
withTempDir { dir =>
677+
withTable("t1", "t2") {
678+
spark.range(10).write.saveAsTable("t1")
679+
sql(s"CREATE EXTERNAL TABLE t2 USING parquet LOCATION '${dir.toURI}' " +
680+
"AS SELECT * FROM range(20)")
681+
withView("v1") {
682+
sql(s"CREATE VIEW v1 AS SELECT * FROM t1")
683+
sql(s"ANALYZE TABLES IN default COMPUTE STATISTICS NOSCAN")
684+
checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts = None)
685+
checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts = None)
686+
687+
sql(s"ANALYZE TABLES COMPUTE STATISTICS")
688+
checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts = Some(10))
689+
checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts = Some(20))
690+
}
691+
}
692+
}
693+
}
674694
}

0 commit comments

Comments
 (0)