Skip to content

Commit 2e62560

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-19265][SQL] make table relation cache general and does not depend on hive
## What changes were proposed in this pull request? We have a table relation plan cache in `HiveMetastoreCatalog`, which caches a lot of things: file status, resolved data source, inferred schema, etc. However, it doesn't make sense to limit this cache with hive support, we should move it to SQL core module so that users can use this cache without hive support. It can also reduce the size of `HiveMetastoreCatalog`, so that it's easier to remove it eventually. main changes: 1. move the table relation cache to `SessionCatalog` 2. `SessionCatalog.lookupRelation` will return `SimpleCatalogRelation` and the analyzer will convert it to `LogicalRelation` or `MetastoreRelation` later, then `HiveSessionCatalog` doesn't need to override `lookupRelation` anymore 3. `FindDataSourceTable` will read/write the table relation cache. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #16621 from cloud-fan/plan-cache.
1 parent 0c92318 commit 2e62560

File tree

20 files changed

+144
-198
lines changed

20 files changed

+144
-198
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ import javax.annotation.concurrent.GuardedBy
2121

2222
import scala.collection.mutable
2323

24+
import com.google.common.cache.{Cache, CacheBuilder}
2425
import org.apache.hadoop.conf.Configuration
2526
import org.apache.hadoop.fs.Path
2627

2728
import org.apache.spark.internal.Logging
2829
import org.apache.spark.sql.AnalysisException
29-
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
30-
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
30+
import org.apache.spark.sql.catalyst._
3131
import org.apache.spark.sql.catalyst.analysis._
3232
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
3333
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
@@ -117,6 +117,14 @@ class SessionCatalog(
117117
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
118118
}
119119

120+
/**
121+
* A cache of qualified table name to table relation plan.
122+
*/
123+
val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
124+
// TODO: create a config instead of hardcode 1000 here.
125+
CacheBuilder.newBuilder().maximumSize(1000).build[QualifiedTableName, LogicalPlan]()
126+
}
127+
120128
/**
121129
* This method is used to make the given path qualified before we
122130
* store this path in the underlying external catalog. So, when a path
@@ -573,7 +581,7 @@ class SessionCatalog(
573581
val relationAlias = alias.getOrElse(table)
574582
if (db == globalTempViewManager.database) {
575583
globalTempViewManager.get(table).map { viewDef =>
576-
SubqueryAlias(relationAlias, viewDef, Some(name))
584+
SubqueryAlias(relationAlias, viewDef, None)
577585
}.getOrElse(throw new NoSuchTableException(db, table))
578586
} else if (name.database.isDefined || !tempTables.contains(table)) {
579587
val metadata = externalCatalog.getTable(db, table)
@@ -586,12 +594,12 @@ class SessionCatalog(
586594
desc = metadata,
587595
output = metadata.schema.toAttributes,
588596
child = parser.parsePlan(viewText))
589-
SubqueryAlias(relationAlias, child, Option(name))
597+
SubqueryAlias(relationAlias, child, Some(name.copy(table = table, database = Some(db))))
590598
} else {
591599
SubqueryAlias(relationAlias, SimpleCatalogRelation(metadata), None)
592600
}
593601
} else {
594-
SubqueryAlias(relationAlias, tempTables(table), Option(name))
602+
SubqueryAlias(relationAlias, tempTables(table), None)
595603
}
596604
}
597605
}
@@ -651,14 +659,21 @@ class SessionCatalog(
651659
* Refresh the cache entry for a metastore table, if any.
652660
*/
653661
def refreshTable(name: TableIdentifier): Unit = synchronized {
662+
val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
663+
val tableName = formatTableName(name.table)
664+
654665
// Go through temporary tables and invalidate them.
655-
// If the database is defined, this is definitely not a temp table.
666+
// If the database is defined, this may be a global temporary view.
656667
// If the database is not defined, there is a good chance this is a temp table.
657668
if (name.database.isEmpty) {
658-
tempTables.get(formatTableName(name.table)).foreach(_.refresh())
659-
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
660-
globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
669+
tempTables.get(tableName).foreach(_.refresh())
670+
} else if (dbName == globalTempViewManager.database) {
671+
globalTempViewManager.get(tableName).foreach(_.refresh())
661672
}
673+
674+
// Also invalidate the table relation cache.
675+
val qualifiedTableName = QualifiedTableName(dbName, tableName)
676+
tableRelationCache.invalidate(qualifiedTableName)
662677
}
663678

664679
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,11 @@ case class TableIdentifier(table: String, database: Option[String])
6060
override val identifier: String = table
6161

6262
def this(table: String) = this(table, None)
63-
6463
}
6564

65+
/** A fully qualified identifier for a table (i.e., database.tableName) */
66+
case class QualifiedTableName(database: String, name: String)
67+
6668
object TableIdentifier {
6769
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
6870
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ class SessionCatalogSuite extends PlanTest {
436436
== SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
437437
// Otherwise, we'll first look up a temporary table with the same name
438438
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
439-
== SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1"))))
439+
== SubqueryAlias("tbl1", tempTable1, None))
440440
// Then, if that does not exist, look up the relation in the current database
441441
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
442442
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
@@ -462,7 +462,7 @@ class SessionCatalogSuite extends PlanTest {
462462
val tmpView = Range(1, 10, 2, 10)
463463
catalog.createTempView("vw1", tmpView, overrideIfExists = false)
464464
val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
465-
assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1"))))
465+
assert(plan == SubqueryAlias("range", tmpView, None))
466466
}
467467

468468
test("look up view relation") {
@@ -479,7 +479,7 @@ class SessionCatalogSuite extends PlanTest {
479479
// Look up a view using current database of the session catalog.
480480
sessionCatalog.setCurrentDatabase("db3")
481481
comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1")),
482-
SubqueryAlias("view1", view, Some(TableIdentifier("view1"))))
482+
SubqueryAlias("view1", view, Some(TableIdentifier("view1", Some("db3")))))
483483
}
484484

485485
test("table exists") {

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
386386
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
387387
relation.catalogTable.identifier
388388
}
389-
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
389+
390+
val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
391+
EliminateSubqueryAliases(tableRelation) match {
390392
// check if the table is a data source table (the relation is a BaseRelation).
391393
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
392394
throw new AnalysisException(

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ case class AnalyzeColumnCommand(
4040
val sessionState = sparkSession.sessionState
4141
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
4242
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
43-
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
43+
val relation =
44+
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
4445

4546
// Compute total size
4647
val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ case class AnalyzeTableCommand(
4141
val sessionState = sparkSession.sessionState
4242
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
4343
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
44-
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
44+
val relation =
45+
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
4546

4647
relation match {
4748
case relation: CatalogRelation =>

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ case class DescribeTableCommand(
450450
if (metadata.schema.isEmpty) {
451451
// In older version(prior to 2.1) of Spark, the table schema can be empty and should be
452452
// inferred at runtime. We should still support it.
453-
describeSchema(catalog.lookupRelation(metadata.identifier).schema, result)
453+
describeSchema(sparkSession.table(metadata.identifier).schema, result)
454454
} else {
455455
describeSchema(metadata.schema, result)
456456
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,17 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import scala.collection.mutable.ArrayBuffer
20+
import java.util.concurrent.Callable
2121

22-
import org.apache.hadoop.fs.Path
22+
import scala.collection.mutable.ArrayBuffer
2323

2424
import org.apache.spark.internal.Logging
2525
import org.apache.spark.rdd.RDD
2626
import org.apache.spark.sql._
27-
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
27+
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
2828
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
2929
import org.apache.spark.sql.catalyst.analysis._
30-
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation}
31-
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
30+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
3231
import org.apache.spark.sql.catalyst.expressions
3332
import org.apache.spark.sql.catalyst.expressions._
3433
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -37,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPa
3736
import org.apache.spark.sql.catalyst.rules.Rule
3837
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
3938
import org.apache.spark.sql.execution.command._
39+
import org.apache.spark.sql.internal.StaticSQLConf
4040
import org.apache.spark.sql.sources._
4141
import org.apache.spark.sql.types._
4242
import org.apache.spark.unsafe.types.UTF8String
@@ -215,37 +215,43 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
215215

216216

217217
/**
218-
* Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data
219-
* source information.
218+
* Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive.
220219
*/
221220
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
222-
private def readDataSourceTable(
223-
sparkSession: SparkSession,
224-
simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
225-
val table = simpleCatalogRelation.catalogTable
226-
val pathOption = table.storage.locationUri.map("path" -> _)
227-
val dataSource =
228-
DataSource(
229-
sparkSession,
230-
userSpecifiedSchema = Some(table.schema),
231-
partitionColumns = table.partitionColumnNames,
232-
bucketSpec = table.bucketSpec,
233-
className = table.provider.get,
234-
options = table.storage.properties ++ pathOption)
235-
236-
LogicalRelation(
237-
dataSource.resolveRelation(),
238-
expectedOutputAttributes = Some(simpleCatalogRelation.output),
239-
catalogTable = Some(table))
221+
private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
222+
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
223+
val cache = sparkSession.sessionState.catalog.tableRelationCache
224+
val withHiveSupport =
225+
sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive"
226+
227+
cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
228+
override def call(): LogicalPlan = {
229+
val pathOption = table.storage.locationUri.map("path" -> _)
230+
val dataSource =
231+
DataSource(
232+
sparkSession,
233+
// In older version(prior to 2.1) of Spark, the table schema can be empty and should be
234+
// inferred at runtime. We should still support it.
235+
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
236+
partitionColumns = table.partitionColumnNames,
237+
bucketSpec = table.bucketSpec,
238+
className = table.provider.get,
239+
options = table.storage.properties ++ pathOption,
240+
// TODO: improve `InMemoryCatalog` and remove this limitation.
241+
catalogTable = if (withHiveSupport) Some(table) else None)
242+
243+
LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
244+
}
245+
})
240246
}
241247

242248
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
243249
case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
244250
if DDLUtils.isDatasourceTable(s.metadata) =>
245-
i.copy(table = readDataSourceTable(sparkSession, s))
251+
i.copy(table = readDataSourceTable(s.metadata))
246252

247253
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
248-
readDataSourceTable(sparkSession, s)
254+
readDataSourceTable(s.metadata)
249255
}
250256
}
251257

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
440440

441441
// If this table is cached as an InMemoryRelation, drop the original
442442
// cached version and make the new version cached lazily.
443-
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
443+
val logicalPlan = sparkSession.table(tableIdent).queryExecution.analyzed
444444
// Use lookupCachedData directly since RefreshTable also takes databaseName.
445445
val isCached = sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
446446
if (isCached) {

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,17 +1626,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
16261626
assert(d.size == d.distinct.size)
16271627
}
16281628

1629-
test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") {
1630-
val tableName = "tbl"
1631-
withTable(tableName) {
1632-
spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName)
1633-
val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName))
1634-
val expr = relation.resolve("i")
1635-
val qe = spark.sessionState.executePlan(Project(Seq(expr), relation))
1636-
qe.assertAnalyzed()
1637-
}
1638-
}
1639-
16401629
private def verifyNullabilityInFilterExec(
16411630
df: DataFrame,
16421631
expr: String,

0 commit comments

Comments
 (0)