Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,17 @@ abstract class Catalog {
*/
def clearCache(): Unit

/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
* table, such as the location of blocks. When those change outside of Spark SQL, users should
* call this function to invalidate the cache.
*
* If this table is cached as an InMemoryRelation, drop the original cached version and make the
* new version cached lazily.
*
* @since 2.0.0
*/
def refreshTable(tableName: String): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,24 +126,9 @@ case class RefreshTable(tableIdent: TableIdentifier)
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
// Refresh the given table's metadata first.
sparkSession.sessionState.catalog.refreshTable(tableIdent)

// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
val df = Dataset.ofRows(sparkSession, logicalPlan)
// Uncache the logicalPlan.
sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
}

// Refresh the given table's metadata. If this table is cached as an InMemoryRelation,
// drop the original cached version and make the new version cached lazily.
sparkSession.catalog.refreshTable(tableIdent.quotedString)
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,33 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
}

/**
* Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
* is refreshed.
*
* @group cachemgmt
* @since 2.0.0
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
sessionCatalog.refreshTable(tableIdent)

// If this table is cached as a InMemoryRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
val df = Dataset.ofRows(sparkSession, logicalPlan)
// Uncache the logicalPlan.
sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
}
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {

def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)

def refreshTable(tableName: String): Unit = {
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
}

def invalidateTable(tableName: String): Unit = {
catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet")

sessionState.refreshTable("arrayInParquet")
sparkSession.catalog.refreshTable("arrayInParquet")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't call refreshTable through SessionState, do we still need to keep SessionState.refreshTable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I also want to remove invalidateTable, which is a duplicate name of refreshTable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, invalidateTable and refreshTable do have different meanings. The current implementation of HiveMetastoreCatalog.refreshTable is HiveMetastoreCatalog.invalidateTable (and then we retrieve the new metadata lazily). But, it does not mean that refreshTable and invalidateTable have the same semantic. If we should remove any of invalidateTable or refreshTable should be discussed in a different thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks!


checkAnswer(
sql("SELECT a FROM arrayInParquet"),
Expand Down Expand Up @@ -681,7 +681,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.mode(SaveMode.Append)
.saveAsTable("mapInParquet")

sessionState.refreshTable("mapInParquet")
sparkSession.catalog.refreshTable("mapInParquet")

checkAnswer(
sql("SELECT a FROM mapInParquet"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle

df.write.parquet(s"$path/p=2")
sql("ALTER TABLE t ADD PARTITION (p=2)")
hiveContext.sessionState.refreshTable("t")
spark.catalog.refreshTable("t")
checkAnswer(
spark.table("t"),
df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))
Expand Down Expand Up @@ -249,7 +249,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle

df.write.parquet(s"$path/p=2")
sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)")
hiveContext.sessionState.refreshTable(s"$db.t")
spark.catalog.refreshTable(s"$db.t")
checkAnswer(
spark.table(s"$db.t"),
df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,16 @@ class HiveContext private[hive](
sparkSession.sharedState.asInstanceOf[HiveSharedState]
}

/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
* table, such as the location of blocks. When those change outside of Spark SQL, users should
* call this function to invalidate the cache.
*
* @since 1.3.0
*/
def refreshTable(tableName: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if invalidateTable has different meaning than refreshTable, should we also add it to HiveContext? cc @yhuai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is for the compatibility purpose. Let's leave it as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

sparkSession.catalog.refreshTable(tableName)
}

}