Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log

private def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this argument was in 1.5, can you find out the PR that removed it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the following commit does not remove that argument, it appears to be the one that changes the behavior for how partitioned tables are looked up in the cache:

e720dda#diff-ee66e11b56c21364760a5ed2b783f863R508

I'm not sure how to find a PR in GitHub associated with a given commit.

metastoreRelation: MetastoreRelation,
schemaInMetastore: StructType,
expectedFileFormat: Class[_ <: FileFormat],
Expand All @@ -200,7 +201,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may not have enough background knowledge to understand this, can you explain a bit more about why this doesn't work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metastoreRelation.catalogTable.storage.locationUri.toSeq

returns the base path of the relation. This is then compared to relation.location.paths to validate the cached entry. For non-empty partitioned tables (by that I mean partitioned tables with one or more metastore partitions), relation.location.paths returns the locations of the partitions. Hence, these values will never be equal and useCached will always be false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relation.location.paths returns the locations of the partitions

How does this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the relation's paths are computed and the logic for empty versus non-empty partitioned tables diverges: https://github.com/VideoAmp/spark-public/blob/8a058c65c6c20e311bde5c0ade87c14c6b6b5f37/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L489-L493.

I believe this is the PR where this behavior was introduced: #13022.

val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq
val cachedRelationFileFormatClass = relation.fileFormat.getClass

expectedFileFormat match {
Expand Down Expand Up @@ -265,9 +265,22 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
PartitionDirectory(values, location)
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val partitionPaths = partitions.map(_.path.toString)

// By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition of a
// partitioned table's paths depends on whether that table has any actual partitions.
// Partitioned tables without partitions use the location of the table's base path.
// Partitioned tables with partitions use the locations of those partitions' data locations,
// _omitting_ the table's base path.
val paths = if (partitionPaths.isEmpty) {
Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
} else {
partitionPaths
}

val cached = getCached(
tableIdentifier,
paths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
Expand Down Expand Up @@ -312,6 +325,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)

val cached = getCached(tableIdentifier,
paths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ case class InsertIntoHiveTable(

// Invalidate the cache.
sqlContext.sharedState.cacheManager.invalidateCache(table)
sqlContext.sessionState.catalog.invalidateTable(table.catalogTable.identifier)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mind explain a bit more about this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially this is a "callback" to the session catalog to invalidate this table in the catalog's table cache, because we just appended to the underlying table data. In the context of a Hive query, the session catalog's table cache is HiveMetastoreCatalog.cachedDataSourceTables. The results of the INSERT will be invisible in the current session until the table is invalidated.

Another way to think about this code is that it's precisely what makes the following test snippet work: https://github.com/VideoAmp/spark-public/blob/spark-15968/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala#L582-L585

Does that answer your question?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah so essentially we have 2 caches to invalidate: the data of table, the metadata of table

Copy link

@erfangc erfangc Aug 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mallman does this change cause new sessions (ex: external App to the ThriftServer via JDBC) to not see the cached tables? I noticed this in the released version 2.0.0 whereby CACHE TABLE in one session has no effect on new sessions. Future SQL statements are still reading the underlying Parquet files from Disk (as evidenced by tasks being NODE_LOCAL and RACK_LOCAL instead of PROCESS_LOCAL). Sorry if this question is unrelated to your patch, but this became a major issue in 2.0.0 for us, where as in 1.6.2 we do not have an issue.

p.s. the tables I am referring to in my examples are unpartitioned. Does sqlContext.sessionState.catalog.invalidateTable somehow invalidates unpartitioned tables?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create a JIRA for this? Seems it's not related to this PR

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will do - thought so too as this relates to InsertIntoHive was a hail mary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erfangc I concur with @cloud-fan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to invalidate the cache for partitioned tables.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are facing the same issue in LOAD.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me submit a PR to fix these issues.


// It would be nice to just return the childRdd unchanged so insert operations could be chained,
// however for now we return an empty list to simplify compatibility checks with hive, which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,18 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") {
withTable("nonPartitioned") {
sql(
s"""CREATE TABLE nonPartitioned (
| key INT,
| value STRING
|)
|STORED AS PARQUET
""".stripMargin)
"""
|CREATE TABLE nonPartitioned (
| key INT,
| value STRING
|)
|STORED AS PARQUET
""".stripMargin)

// First lookup fills the cache
val r1 = collectHadoopFsRelation (table("nonPartitioned"))
val r1 = collectHadoopFsRelation(table("nonPartitioned"))
// Second lookup should reuse the cache
val r2 = collectHadoopFsRelation (table("nonPartitioned"))
val r2 = collectHadoopFsRelation(table("nonPartitioned"))
// They should be the same instance
assert(r1 eq r2)
}
Expand All @@ -408,18 +409,42 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") {
withTable("partitioned") {
sql(
s"""CREATE TABLE partitioned (
| key INT,
| value STRING
|)
|PARTITIONED BY (part INT)
|STORED AS PARQUET
""".stripMargin)
"""
|CREATE TABLE partitioned (
| key INT,
| value STRING
|)
|PARTITIONED BY (part INT)
|STORED AS PARQUET
""".stripMargin)

// First lookup fills the cache
val r1 = collectHadoopFsRelation(table("partitioned"))
// Second lookup should reuse the cache
val r2 = collectHadoopFsRelation(table("partitioned"))
// They should be the same instance
assert(r1 eq r2)
}
}

test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you take a look a CachedTableSuite and add the test there (and also use a similar approach).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked in CachedTableSuite. I'm not sure that's a good place for this kind of test. That test suite seems focused on testing tables cached by the CacheManager. This patch is focused on table caching in HiveMetastoreCatalog.

It's difficult to find the best place for these kinds of caching tests. I chose this file because it already had some of these tests. Perhaps HiveMetastoreCatalogSuite would be a good candidate for an alternative?

"relation") {
withTable("partitioned") {
sql(
"""
|CREATE TABLE partitioned (
| key INT,
| value STRING
|)
|PARTITIONED BY (part INT)
|STORED AS PARQUET
""".stripMargin)
sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 'one' as value")

// First lookup fills the cache
val r1 = collectHadoopFsRelation (table("partitioned"))
val r1 = collectHadoopFsRelation(table("partitioned"))
// Second lookup should reuse the cache
val r2 = collectHadoopFsRelation (table("partitioned"))
val r2 = collectHadoopFsRelation(table("partitioned"))
// They should be the same instance
assert(r1 eq r2)
}
Expand Down Expand Up @@ -557,7 +582,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))

// Add data files to partition directory and check whether they can be read
Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir)
sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
Expand Down