From b1cd1c670adbd0db3dcb82831b4aacae514c37f1 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 17 May 2016 13:38:22 -0700 Subject: [PATCH 1/9] initial fix --- .../org/apache/spark/sql/SQLContext.scala | 13 ++++++ .../spark/sql/hive/MultiDatabaseSuite.scala | 40 ++++++++++--------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index c64e284635ec..e6b1bc1f437a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -294,6 +294,19 @@ class SQLContext private[sql]( sparkSession.catalog.clearCache() } + /** + * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, + * Spark SQL or the external data source library it uses might cache certain metadata about a + * table, such as the location of blocks. When those change outside of Spark SQL, users should + * call this function to invalidate the cache. + * + * @since 1.3.0 + */ + def refreshTable(tableName: String): Unit = { + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + sessionState.catalog.refreshTable(tableIdent) + } + // scalastyle:off // Disable style checker so "implicits" object can start with lowercase i /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 6c9ce208dbd6..23114af7a3c3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -202,7 +202,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle activateDatabase(db) { sql( - s"""CREATE EXTERNAL TABLE t (id BIGINT) + s""" + |CREATE EXTERNAL TABLE t (id BIGINT) |PARTITIONED BY (p INT) |STORED AS PARQUET |LOCATION '$path' @@ -217,7 +218,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle df.write.parquet(s"$path/p=2") sql("ALTER TABLE t ADD PARTITION (p=2)") - hiveContext.sessionState.refreshTable("t") + hiveContext.refreshTable("t") checkAnswer( spark.table("t"), df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2)))) @@ -234,11 +235,12 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val path = dir.getCanonicalPath sql( - s"""CREATE EXTERNAL TABLE $db.t (id BIGINT) - |PARTITIONED BY (p INT) - |STORED AS PARQUET - |LOCATION '$path' - """.stripMargin) + s""" + |CREATE EXTERNAL TABLE $db.t (id BIGINT) + |PARTITIONED BY (p INT) + |STORED AS PARQUET + |LOCATION '$path' + """.stripMargin) checkAnswer(spark.table(s"$db.t"), spark.emptyDataFrame) @@ -249,7 +251,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle df.write.parquet(s"$path/p=2") sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)") - hiveContext.sessionState.refreshTable(s"$db.t") + hiveContext.refreshTable(s"$db.t") checkAnswer( spark.table(s"$db.t"), df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2)))) @@ -279,11 +281,11 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val message = intercept[AnalysisException] { sql( s""" - |CREATE TABLE `d:b`.`t:a` (a int) - |USING parquet - |OPTIONS ( - | path '$path' - |) + |CREATE TABLE `d:b`.`t:a` (a int) + |USING parquet + |OPTIONS ( + | path '$path' + |) """.stripMargin) }.getMessage assert(message.contains("is not a valid name for metastore")) @@ -293,12 +295,12 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val message = intercept[AnalysisException] { sql( s""" - |CREATE TABLE `d:b`.`table` (a int) - |USING parquet - |OPTIONS ( - | path '$path' - |) - """.stripMargin) + |CREATE TABLE `d:b`.`table` (a int) + |USING parquet + |OPTIONS ( + | path '$path' + |) + """.stripMargin) }.getMessage assert(message.contains("is not a valid name for metastore")) } From bdd7c61b9d1d7fb1b839485c10f82300304860c1 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 17 May 2016 14:14:58 -0700 Subject: [PATCH 2/9] fix. --- .../main/scala/org/apache/spark/sql/SQLContext.scala | 3 +-- .../scala/org/apache/spark/sql/catalog/Catalog.scala | 10 ++++++++++ .../org/apache/spark/sql/internal/CatalogImpl.scala | 11 +++++++++++ .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 4 ++-- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e6b1bc1f437a..a6ccd0c21db6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -303,8 +303,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def refreshTable(tableName: String): Unit = { - val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - sessionState.catalog.refreshTable(tableIdent) + sparkSession.catalog.refreshTable(tableName) } // scalastyle:off diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 49c07427617f..ab82f75360c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -211,4 +211,14 @@ abstract class Catalog { */ def clearCache(): Unit + /** + * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, + * Spark SQL or the external data source library it uses might cache certain metadata about a + * table, such as the location of blocks. When those change outside of Spark SQL, users should + * call this function to invalidate the cache. + * + * @since 2.0.0 + */ + def refreshTable(tableName: String): Unit + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 473e827f4db1..2dde86cf8176 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -345,6 +345,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { sparkSession.cacheManager.lookupCachedData(qName).nonEmpty } + /** + * Refresh the cache entry for a metastore table, if any. + * + * @group cachemgmt + * @since 2.0.0 + */ + override def refreshTable(tableName: String): Unit = { + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + sessionCatalog.refreshTable(tableIdent) + } + } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 676fbd0a39b6..e82fa837fb48 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -622,7 +622,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .mode(SaveMode.Append) .saveAsTable("arrayInParquet") - sessionState.refreshTable("arrayInParquet") + refreshTable("arrayInParquet") checkAnswer( sql("SELECT a FROM arrayInParquet"), @@ -681,7 +681,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .mode(SaveMode.Append) .saveAsTable("mapInParquet") - sessionState.refreshTable("mapInParquet") + refreshTable("mapInParquet") checkAnswer( sql("SELECT a FROM mapInParquet"), From e3564d5dff530ce84a28d7ed90a4ff4bac7de46b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 17 May 2016 14:29:28 -0700 Subject: [PATCH 3/9] fix again. --- python/pyspark/sql/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index ca111ae9bb7d..954fd25aea17 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -479,7 +479,7 @@ def refreshTable(self, tableName): location of blocks. When those change outside of Spark SQL, users should call this function to invalidate the cache. """ - self._ssql_ctx.refreshTable(tableName) + self.sparkSession.catalog.refreshTable(tableName) class UDFRegistration(object): From c3f3f0b481c5a3fe3b2485ab0d73194dd7898911 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 17 May 2016 15:03:33 -0700 Subject: [PATCH 4/9] revert it back --- python/pyspark/sql/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 954fd25aea17..ca111ae9bb7d 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -479,7 +479,7 @@ def refreshTable(self, tableName): location of blocks. When those change outside of Spark SQL, users should call this function to invalidate the cache. """ - self.sparkSession.catalog.refreshTable(tableName) + self._ssql_ctx.refreshTable(tableName) class UDFRegistration(object): From 5f342f729bafd8abfc3c14f85541c2bf907a4523 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 17 May 2016 20:47:53 -0700 Subject: [PATCH 5/9] address comments. --- .../org/apache/spark/sql/SQLContext.scala | 12 ----- .../sql/hive/MetastoreDataSourcesSuite.scala | 4 +- .../spark/sql/hive/MultiDatabaseSuite.scala | 40 +++++++-------- sql/hivecontext-compatibility/pom.xml | 7 +++ .../apache/spark/sql/hive/HiveContext.scala | 12 +++++ .../hive/HiveContextCompatibilitySuite.scala | 49 +++++++++++++++++-- 6 files changed, 86 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a6ccd0c21db6..c64e284635ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -294,18 +294,6 @@ class SQLContext private[sql]( sparkSession.catalog.clearCache() } - /** - * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, - * Spark SQL or the external data source library it uses might cache certain metadata about a - * table, such as the location of blocks. When those change outside of Spark SQL, users should - * call this function to invalidate the cache. - * - * @since 1.3.0 - */ - def refreshTable(tableName: String): Unit = { - sparkSession.catalog.refreshTable(tableName) - } - // scalastyle:off // Disable style checker so "implicits" object can start with lowercase i /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 4f014529e1e5..ad7e30e25d7e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -622,7 +622,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .mode(SaveMode.Append) .saveAsTable("arrayInParquet") - refreshTable("arrayInParquet") + sparkSession.catalog.refreshTable("arrayInParquet") checkAnswer( sql("SELECT a FROM arrayInParquet"), @@ -681,7 +681,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .mode(SaveMode.Append) .saveAsTable("mapInParquet") - refreshTable("mapInParquet") + sparkSession.catalog.refreshTable("mapInParquet") checkAnswer( sql("SELECT a FROM mapInParquet"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 23114af7a3c3..ca48825cd334 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -202,8 +202,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle activateDatabase(db) { sql( - s""" - |CREATE EXTERNAL TABLE t (id BIGINT) + s"""CREATE EXTERNAL TABLE t (id BIGINT) |PARTITIONED BY (p INT) |STORED AS PARQUET |LOCATION '$path' @@ -218,7 +217,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle df.write.parquet(s"$path/p=2") sql("ALTER TABLE t ADD PARTITION (p=2)") - hiveContext.refreshTable("t") + spark.catalog.refreshTable("t") checkAnswer( spark.table("t"), df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2)))) @@ -235,12 +234,11 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val path = dir.getCanonicalPath sql( - s""" - |CREATE EXTERNAL TABLE $db.t (id BIGINT) - |PARTITIONED BY (p INT) - |STORED AS PARQUET - |LOCATION '$path' - """.stripMargin) + s"""CREATE EXTERNAL TABLE $db.t (id BIGINT) + |PARTITIONED BY (p INT) + |STORED AS PARQUET + |LOCATION '$path' + """.stripMargin) checkAnswer(spark.table(s"$db.t"), spark.emptyDataFrame) @@ -251,7 +249,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle df.write.parquet(s"$path/p=2") sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)") - hiveContext.refreshTable(s"$db.t") + spark.catalog.refreshTable(s"$db.t") checkAnswer( spark.table(s"$db.t"), df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2)))) @@ -281,11 +279,11 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val message = intercept[AnalysisException] { sql( s""" - |CREATE TABLE `d:b`.`t:a` (a int) - |USING parquet - |OPTIONS ( - | path '$path' - |) + |CREATE TABLE `d:b`.`t:a` (a int) + |USING parquet + |OPTIONS ( + | path '$path' + |) """.stripMargin) }.getMessage assert(message.contains("is not a valid name for metastore")) @@ -295,12 +293,12 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val message = intercept[AnalysisException] { sql( s""" - |CREATE TABLE `d:b`.`table` (a int) - |USING parquet - |OPTIONS ( - | path '$path' - |) - """.stripMargin) + |CREATE TABLE `d:b`.`table` (a int) + |USING parquet + |OPTIONS ( + | path '$path' + |) + """.stripMargin) }.getMessage assert(message.contains("is not a valid name for metastore")) } diff --git a/sql/hivecontext-compatibility/pom.xml b/sql/hivecontext-compatibility/pom.xml index ed9ef8e27919..72c39a151077 100644 --- a/sql/hivecontext-compatibility/pom.xml +++ b/sql/hivecontext-compatibility/pom.xml @@ -48,6 +48,13 @@ test-jar test + + org.apache.spark + spark-sql_${scala.binary.version} + test-jar + ${project.version} + test + diff --git a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 75166f6beaa8..415d4c0049d4 100644 --- a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -58,4 +58,16 @@ class HiveContext private[hive]( sparkSession.sharedState.asInstanceOf[HiveSharedState] } + /** + * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, + * Spark SQL or the external data source library it uses might cache certain metadata about a + * table, such as the location of blocks. When those change outside of Spark SQL, users should + * call this function to invalidate the cache. + * + * @since 1.3.0 + */ + def refreshTable(tableName: String): Unit = { + sparkSession.catalog.refreshTable(tableName) + } + } diff --git a/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala index 5df674d60e9c..029e8d91251c 100644 --- a/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala +++ b/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala @@ -20,12 +20,17 @@ package org.apache.spark.sql.hive import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.util.Utils -class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach { +class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach + with SQLTestUtils { private var sc: SparkContext = null private var hc: HiveContext = null + protected var spark: SparkSession = _ override def beforeAll(): Unit = { super.beforeAll() @@ -34,6 +39,7 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac sc.hadoopConfiguration.set(k, v) } hc = new HiveContext(sc) + spark = hc.sparkSession } override def afterEach(): Unit = { @@ -66,7 +72,7 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac val res = df3.collect() val expected = Seq((18, 18, 8)).toDF("a", "x", "b").collect() assert(res.toSeq == expected.toSeq) - df3.registerTempTable("mai_table") + df3.createOrReplaceTempView("mai_table") val df4 = hc.table("mai_table") val res2 = df4.collect() assert(res2.toSeq == expected.toSeq) @@ -82,7 +88,7 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac val databases2 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) assert(databases2.toSet == Set("default", "mee_db")) val df = (1 to 10).map { i => ("bob" + i.toString, i) }.toDF("name", "age") - df.registerTempTable("mee_table") + df.createOrReplaceTempView("mee_table") hc.sql("CREATE TABLE moo_table (name string, age int)") hc.sql("INSERT INTO moo_table SELECT * FROM mee_table") assert( @@ -99,4 +105,41 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac assert(databases3.toSeq == Seq("default")) } + test("check change after refresh") { + val _hc = hc + import _hc.implicits._ + + withTempPath { tempDir => + withTable("jsonTable") { + (("a", "b") :: Nil).toDF().toJSON.rdd.saveAsTextFile(tempDir.getCanonicalPath) + + hc.sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + assert( + hc.sql("SELECT * FROM jsonTable").collect().toSeq == Row("a", "b") :: Nil) + + Utils.deleteRecursively(tempDir) + (("a1", "b1", "c1") :: Nil).toDF().toJSON.rdd.saveAsTextFile(tempDir.getCanonicalPath) + + // Schema is cached so the new column does not show. The updated values in existing columns + // will show. + assert( + hc.sql("SELECT * FROM jsonTable").collect().toSeq == Row("a1", "b1") :: Nil) + + hc.refreshTable("jsonTable") + + // Check that the refresh worked + assert( + hc.sql("SELECT * FROM jsonTable").collect().toSeq == Row("a1", "b1", "c1") :: Nil) + } + } + } + } From 7142ef54fbc806c12859f2af152794af5d50ec72 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 18 May 2016 11:27:24 -0700 Subject: [PATCH 6/9] address comments --- .../apache/spark/sql/catalog/Catalog.scala | 3 +++ .../spark/sql/execution/datasources/ddl.scala | 21 +++---------------- .../spark/sql/internal/CatalogImpl.scala | 15 +++++++++++++ .../spark/sql/internal/SessionState.scala | 3 +++ 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index ab82f75360c3..a99bc3bff6ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -217,6 +217,9 @@ abstract class Catalog { * table, such as the location of blocks. When those change outside of Spark SQL, users should * call this function to invalidate the cache. * + * If this table is cached as an InMemoryRelation, drop the original cached version and make the + * new version cached lazily. + * * @since 2.0.0 */ def refreshTable(tableName: String): Unit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 68238dbb46e9..78b1db16826e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -126,24 +126,9 @@ case class RefreshTable(tableIdent: TableIdentifier) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - // Refresh the given table's metadata first. - sparkSession.sessionState.catalog.refreshTable(tableIdent) - - // If this table is cached as a InMemoryColumnarRelation, drop the original - // cached version and make the new version cached lazily. - val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent) - // Use lookupCachedData directly since RefreshTable also takes databaseName. - val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty - if (isCached) { - // Create a data frame to represent the table. - // TODO: Use uncacheTable once it supports database name. - val df = Dataset.ofRows(sparkSession, logicalPlan) - // Uncache the logicalPlan. - sparkSession.cacheManager.tryUncacheQuery(df, blocking = true) - // Cache it again. - sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table)) - } - + // Refresh the given table's metadata. If this table is cached as an InMemoryRelation, + // drop the original cached version and make the new version cached lazily. + sparkSession.catalog.refreshTable(tableIdent.quotedString) Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 2dde86cf8176..5b8bb8ac951e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -354,6 +354,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { override def refreshTable(tableName: String): Unit = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) sessionCatalog.refreshTable(tableIdent) + + // If this table is cached as a InMemoryRelation, drop the original + // cached version and make the new version cached lazily. + val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent) + // Use lookupCachedData directly since RefreshTable also takes databaseName. + val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty + if (isCached) { + // Create a data frame to represent the table. + // TODO: Use uncacheTable once it supports database name. + val df = Dataset.ofRows(sparkSession, logicalPlan) + // Uncache the logicalPlan. + sparkSession.cacheManager.tryUncacheQuery(df, blocking = true) + // Cache it again. + sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table)) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index f0b8a83dee8c..2db841a0665e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -163,6 +163,9 @@ private[sql] class SessionState(sparkSession: SparkSession) { def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan) def refreshTable(tableName: String): Unit = { + // Different from SparkSession.catalog.refreshTable, this API only refreshes the metadata. + // It does not reload the cached data. That means, if this table is cached as + // an InMemoryRelation, we do not refresh the cached data. catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) } From 8a52ac608d1836e095cf83185be37a25696cf0c7 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 18 May 2016 11:46:15 -0700 Subject: [PATCH 7/9] update the comment. --- .../main/scala/org/apache/spark/sql/internal/CatalogImpl.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 5b8bb8ac951e..1371abe189f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -346,7 +346,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Refresh the cache entry for a metastore table, if any. + * Refresh the cache entry for a table, if any. For Hive metastore table, the metadata + * is refreshed. * * @group cachemgmt * @since 2.0.0 From 2b773b823672199a685e765f5345ceb6584eb3d8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 18 May 2016 18:03:33 -0700 Subject: [PATCH 8/9] remove the test --- sql/hivecontext-compatibility/pom.xml | 7 --- .../hive/HiveContextCompatibilitySuite.scala | 45 +------------------ 2 files changed, 1 insertion(+), 51 deletions(-) diff --git a/sql/hivecontext-compatibility/pom.xml b/sql/hivecontext-compatibility/pom.xml index 72c39a151077..ed9ef8e27919 100644 --- a/sql/hivecontext-compatibility/pom.xml +++ b/sql/hivecontext-compatibility/pom.xml @@ -48,13 +48,6 @@ test-jar test - - org.apache.spark - spark-sql_${scala.binary.version} - test-jar - ${project.version} - test - diff --git a/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala index 029e8d91251c..1c1db72e27ff 100644 --- a/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala +++ b/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala @@ -20,17 +20,12 @@ package org.apache.spark.sql.hive import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkContext, SparkFunSuite} -import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.util.Utils -class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach - with SQLTestUtils { +class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach { private var sc: SparkContext = null private var hc: HiveContext = null - protected var spark: SparkSession = _ override def beforeAll(): Unit = { super.beforeAll() @@ -39,7 +34,6 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac sc.hadoopConfiguration.set(k, v) } hc = new HiveContext(sc) - spark = hc.sparkSession } override def afterEach(): Unit = { @@ -105,41 +99,4 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac assert(databases3.toSeq == Seq("default")) } - test("check change after refresh") { - val _hc = hc - import _hc.implicits._ - - withTempPath { tempDir => - withTable("jsonTable") { - (("a", "b") :: Nil).toDF().toJSON.rdd.saveAsTextFile(tempDir.getCanonicalPath) - - hc.sql( - s""" - |CREATE TABLE jsonTable - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '${tempDir.getCanonicalPath}' - |) - """.stripMargin) - - assert( - hc.sql("SELECT * FROM jsonTable").collect().toSeq == Row("a", "b") :: Nil) - - Utils.deleteRecursively(tempDir) - (("a1", "b1", "c1") :: Nil).toDF().toJSON.rdd.saveAsTextFile(tempDir.getCanonicalPath) - - // Schema is cached so the new column does not show. The updated values in existing columns - // will show. - assert( - hc.sql("SELECT * FROM jsonTable").collect().toSeq == Row("a1", "b1") :: Nil) - - hc.refreshTable("jsonTable") - - // Check that the refresh worked - assert( - hc.sql("SELECT * FROM jsonTable").collect().toSeq == Row("a1", "b1", "c1") :: Nil) - } - } - } - } From 20d50556c6a3a4ca2d69f961822a2bb058edbbec Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 19 May 2016 14:32:57 -0700 Subject: [PATCH 9/9] remove refreshTable --- .../scala/org/apache/spark/sql/internal/SessionState.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 2db841a0665e..8f7c6f5d0ca4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -162,13 +162,6 @@ private[sql] class SessionState(sparkSession: SparkSession) { def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan) - def refreshTable(tableName: String): Unit = { - // Different from SparkSession.catalog.refreshTable, this API only refreshes the metadata. - // It does not reload the cached data. That means, if this table is cached as - // an InMemoryRelation, we do not refresh the cached data. - catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) - } - def invalidateTable(tableName: String): Unit = { catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName)) }