From 8a058c65c6c20e311bde5c0ade87c14c6b6b5f37 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Wed, 15 Jun 2016 09:52:17 -0700 Subject: [PATCH 1/3] [SPARK-15968][SQL] HiveMetastoreCatalog does not correctly validate partitioned metastore relation when searching the internal table cache The `getCached` method of `HiveMetastoreCatalog` computes `pathsInMetastore` from the metastore relation's catalog table. This only returns the table base path, which is not correct for nonempty partitioned tables. As a result, cached lookups on nonempty partitioned tables always miss. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 6 ++++- .../hive/execution/InsertIntoHiveTable.scala | 1 + .../apache/spark/sql/hive/parquetSuites.scala | 24 ++++++++++++++++++- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2e0b5d59b578..1a052e62524b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -191,6 +191,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log private def getCached( tableIdentifier: QualifiedTableName, + pathsInMetastore: Seq[String], metastoreRelation: MetastoreRelation, schemaInMetastore: StructType, expectedFileFormat: Class[_ <: FileFormat], @@ -200,7 +201,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => - val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq val cachedRelationFileFormatClass = relation.fileFormat.getClass expectedFileFormat match { @@ -265,9 +265,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log PartitionDirectory(values, location) } val partitionSpec = PartitionSpec(partitionSchema, partitions) + val partitionPaths = partitions.map(_.path.toString) + val paths = partitionPaths.padTo(1, metastoreRelation.hiveQlTable.getDataLocation.toString) val cached = getCached( tableIdentifier, + paths, metastoreRelation, metastoreSchema, fileFormatClass, @@ -312,6 +315,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) val cached = getCached(tableIdentifier, + paths, metastoreRelation, metastoreSchema, fileFormatClass, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 97cd29f541ed..529d3887167c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -298,6 +298,7 @@ case class InsertIntoHiveTable( // Invalidate the cache. sqlContext.sharedState.cacheManager.invalidateCache(table) + sqlContext.sessionState.catalog.invalidateTable(table.catalogTable.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 6af9976ea0b8..3f47fc7a16c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -425,6 +425,28 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } } + test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached " + + "relation") { + withTable("partitioned") { + sql( + s"""CREATE TABLE partitioned ( + | key INT, + | value STRING + |) + |PARTITIONED BY (part INT) + |STORED AS PARQUET + """.stripMargin) + sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 'one' as value") + + // First lookup fills the cache + val r1 = collectHadoopFsRelation (table("partitioned")) + // Second lookup should reuse the cache + val r2 = collectHadoopFsRelation (table("partitioned")) + // They should be the same instance + assert(r1 eq r2) + } + } + test("Caching converted data source Parquet Relations") { def checkCached(tableIdentifier: TableIdentifier): Unit = { // Converted test_parquet should be cached. @@ -557,7 +579,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) // Add data files to partition directory and check whether they can be read - Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir) + sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a") checkAnswer( sql("SELECT * FROM test_added_partitions"), Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b")) From c2ba4af116c9e999f2fa2f68868b72648b4234c7 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Thu, 30 Jun 2016 17:41:17 -0700 Subject: [PATCH 2/3] Clarify the definition of a partitioned table's file paths --- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 1a052e62524b..2572f868cb76 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -266,7 +266,17 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } val partitionSpec = PartitionSpec(partitionSchema, partitions) val partitionPaths = partitions.map(_.path.toString) - val paths = partitionPaths.padTo(1, metastoreRelation.hiveQlTable.getDataLocation.toString) + + // By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition of a + // partitioned table's paths depends on whether that table has any actual partitions. + // Partitioned tables without partitions use the location of the table's base path. + // Partitioned tables with partitions use the locations of those partitions' data locations, + // _omitting_ the table's base path. + val paths = if (partitionPaths.isEmpty) { + Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) + } else { + partitionPaths + } val cached = getCached( tableIdentifier, From 91ef9508a2cb992772547bc2cc5c01c63b41abd4 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Mon, 4 Jul 2016 10:19:51 -0700 Subject: [PATCH 3/3] Fix indentation issues in test for SPARK-15968 in parquetSuites.scala, and tidy up the other two tests from which it was copy-pasta'd --- .../apache/spark/sql/hive/parquetSuites.scala | 55 ++++++++++--------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 3f47fc7a16c0..ef1230df834e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -389,17 +389,18 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") { withTable("nonPartitioned") { sql( - s"""CREATE TABLE nonPartitioned ( - | key INT, - | value STRING - |) - |STORED AS PARQUET - """.stripMargin) + """ + |CREATE TABLE nonPartitioned ( + | key INT, + | value STRING + |) + |STORED AS PARQUET + """.stripMargin) // First lookup fills the cache - val r1 = collectHadoopFsRelation (table("nonPartitioned")) + val r1 = collectHadoopFsRelation(table("nonPartitioned")) // Second lookup should reuse the cache - val r2 = collectHadoopFsRelation (table("nonPartitioned")) + val r2 = collectHadoopFsRelation(table("nonPartitioned")) // They should be the same instance assert(r1 eq r2) } @@ -408,18 +409,19 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") { withTable("partitioned") { sql( - s"""CREATE TABLE partitioned ( - | key INT, - | value STRING - |) - |PARTITIONED BY (part INT) - |STORED AS PARQUET - """.stripMargin) + """ + |CREATE TABLE partitioned ( + | key INT, + | value STRING + |) + |PARTITIONED BY (part INT) + |STORED AS PARQUET + """.stripMargin) // First lookup fills the cache - val r1 = collectHadoopFsRelation (table("partitioned")) + val r1 = collectHadoopFsRelation(table("partitioned")) // Second lookup should reuse the cache - val r2 = collectHadoopFsRelation (table("partitioned")) + val r2 = collectHadoopFsRelation(table("partitioned")) // They should be the same instance assert(r1 eq r2) } @@ -429,19 +431,20 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { "relation") { withTable("partitioned") { sql( - s"""CREATE TABLE partitioned ( - | key INT, - | value STRING - |) - |PARTITIONED BY (part INT) - |STORED AS PARQUET - """.stripMargin) + """ + |CREATE TABLE partitioned ( + | key INT, + | value STRING + |) + |PARTITIONED BY (part INT) + |STORED AS PARQUET + """.stripMargin) sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 'one' as value") // First lookup fills the cache - val r1 = collectHadoopFsRelation (table("partitioned")) + val r1 = collectHadoopFsRelation(table("partitioned")) // Second lookup should reuse the cache - val r2 = collectHadoopFsRelation (table("partitioned")) + val r2 = collectHadoopFsRelation(table("partitioned")) // They should be the same instance assert(r1 eq r2) }