Skip to content

Conversation

@mallman
Copy link
Contributor

@mallman mallman commented Jun 21, 2016

(Please note this is a revision of PR #13686, which has been closed in favor of this PR.)

This PR addresses SPARK-15968.

What changes were proposed in this pull request?

The getCached method of HiveMetastoreCatalog computes pathsInMetastore from the metastore relation's catalog table. This only returns the table base path, which is incomplete/inaccurate for a nonempty partitioned table. As a result, cached lookups on nonempty partitioned tables always miss.

Rather than get pathsInMetastore from

metastoreRelation.catalogTable.storage.locationUri.toSeq

I modified the getCached method to take a pathsInMetastore argument. Calls to this method pass in the paths computed from calls to the Hive metastore. This is how getCached was implemented in Spark 1.5:

.

I also added a call in InsertIntoHiveTable.scala to invalidate the table from the SQL session catalog.

How was this patch tested?

I've added a new unit test to parquetSuites.scala:

SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached relation

Note that the only difference between this new test and the one above it in the file is that the new test populates its partitioned table with a single value, while the existing test leaves the table empty. This reveals a subtle, unexpected hole in test coverage present before this patch.

Note I also modified a different but related unit test in parquetSuites.scala:

SPARK-15248: explicitly added partitions should be readable

This unit test asserts that Spark SQL should return data from a table partition which has been placed there outside a metastore query immediately after it is added. I changed the test so that, instead of adding the data as a parquet file saved in the partition's location, the data is added through a SQL INSERT query. I made this change because I could find no way to efficiently support partitioned table caching without failing that test.

In addition to my primary motivation, I can offer a few reasons I believe this is an acceptable weakening of that test. First, it still validates a fix for SPARK-15248, the issue for which it was written. Second, the assertion made is stronger than that required for non-partitioned tables. If you write data to the storage location of a non-partitioned metastore table without using a proper SQL DML query, a subsequent call to show that data will not return it. I believe this is an intentional limitation put in place to make table caching feasible, but I'm only speculating.

Building a large HadoopFsRelation requires stat-ing all of its data files. In our environment, where we have tables with 10's of thousands of partitions, the difference between using a cached relation versus a new one is a matter of seconds versus minutes. Caching partitioned table metadata vastly improves the usability of Spark SQL for these cases.

Thanks.

partitioned metastore relation when searching the internal table cache

The `getCached` method of `HiveMetastoreCatalog` computes
`pathsInMetastore` from the metastore relation's catalog table. This
only returns the table base path, which is not correct for nonempty
partitioned tables. As a result, cached lookups on nonempty partitioned
tables always miss.
@mallman
Copy link
Contributor Author

mallman commented Jun 21, 2016

@hvanhovell I'm mentioning you here because you commented on my previous PR for this Jira issue. In response to your original question, yes, I have added a unit test for this patch.

}
}

test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you take a look a CachedTableSuite and add the test there (and also use a similar approach).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked in CachedTableSuite. I'm not sure that's a good place for this kind of test. That test suite seems focused on testing tables cached by the CacheManager. This patch is focused on table caching in HiveMetastoreCatalog.

It's difficult to find the best place for these kinds of caching tests. I chose this file because it already had some of these tests. Perhaps HiveMetastoreCatalogSuite would be a good candidate for an alternative?

@SparkQA
Copy link

SparkQA commented Jun 22, 2016

Test build #3124 has finished for PR 13818 at commit 8a058c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jun 30, 2016

cc @cloud-fan / @liancheng

cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may not have enough background knowledge to understand this, can you explain a bit more about why this doesn't work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metastoreRelation.catalogTable.storage.locationUri.toSeq

returns the base path of the relation. This is then compared to relation.location.paths to validate the cached entry. For non-empty partitioned tables (by that I mean partitioned tables with one or more metastore partitions), relation.location.paths returns the locations of the partitions. Hence, these values will never be equal and useCached will always be false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relation.location.paths returns the locations of the partitions

How does this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the relation's paths are computed and the logic for empty versus non-empty partitioned tables diverges: https://github.com/VideoAmp/spark-public/blob/8a058c65c6c20e311bde5c0ade87c14c6b6b5f37/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L489-L493.

I believe this is the PR where this behavior was introduced: #13022.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jun 30, 2016

seems a reasonable fix to me, thanks for working on it!

@mallman
Copy link
Contributor Author

mallman commented Jul 1, 2016

You are very welcome. Thank you for taking time to review it! 😃

@cloud-fan
Copy link
Contributor

LGTM, cc @liancheng

@yhuai
Copy link
Contributor

yhuai commented Jul 1, 2016

ok to test

@yhuai
Copy link
Contributor

yhuai commented Jul 1, 2016

test this please

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61594 has finished for PR 13818 at commit c2ba4af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

|)
|PARTITIONED BY (part INT)
|STORED AS PARQUET
""".stripMargin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Indentation is off here.

@liancheng
Copy link
Contributor

LGTM except for minor styling issues. Thanks!

and tidy up the other two tests from which it was copy-pasta'd
@mallman
Copy link
Contributor Author

mallman commented Jul 4, 2016

I believe I've addressed @liancheng's style issues in my new unit test, along with the same in the two tests from which it was copy-pasta'd (boy scout rule). Hopefully I didn't cock it up.

@SparkQA
Copy link

SparkQA commented Jul 4, 2016

Test build #61730 has finished for PR 13818 at commit 91ef950.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 8f6cf00 Jul 5, 2016
@liancheng
Copy link
Contributor

Shall we also have this in branch-2.0? This seems to be a pretty serious bug. cc @rxin.

@yhuai
Copy link
Contributor

yhuai commented Jul 5, 2016

I have a few questions.

  1. Is it a regression from 1.6? Looks like not?
  2. Is it a correctness issue or a performance issue? Seems it is a performance issue?
  3. If it is a performance issue. What is the impact? For a hive parquet/orc table, after we convert them to Spark's native code path, there is no partitioning discovery. So, I guess the performance is mainly coming from querying metastore? If so, what will be the perf difference after spark.sql.hive.metastorePartitionPruning (only querying needed partition info from Hive metastore) is enabled?

My feeling is that if it is a perf issue and it is not a regression from 1.6, merging to master should be good enough.

@zsxwing
Copy link
Member

zsxwing commented Jul 5, 2016

FYI this breaks Scala 2.10:

[info] Compiling 254 Scala sources and 5 Java sources to /home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/mllib/target/scala-2.10/classes...
[error] /home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:301: value invalidateTable is not a member of org.apache.spark.sql.catalyst.catalog.SessionCatalog
[error]     sqlContext.sessionState.catalog.invalidateTable(table.catalogTable.identifier)
[error]                                     ^
[error] one error found
[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list
[info] Packaging /home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/mllib/target/scala-2.10/spark-mllib_2.10-2.0.0-SNAPSHOT.jar ...

@mallman
Copy link
Contributor Author

mallman commented Jul 5, 2016

I have a few questions.

Is it a regression from 1.6? Looks like not?

I don't know about 1.6. I know it's a regression from 1.5.

Is it a correctness issue or a performance issue? Seems it is a performance issue?

It is a performance issue.

If it is a performance issue. What is the impact? For a hive parquet/orc table, after we convert them to Spark's native code path, there is no partitioning discovery. So, I guess the performance is mainly coming from querying metastore? If so, what will be the perf difference after spark.sql.hive.metastorePartitionPruning (only querying needed partition info from Hive metastore) is enabled?

The problem this PR addresses occurs in the analysis phase of query planning. The property spark.sql.hive.metastorePartitionPruning only comes into play in HiveTableScanExec, which is part of physical planning. (And I don't believe it's used to read Parquet tables.) Therefore, that property has no bearing on this problem.

Regarding the impact, I'll quote from the last paragraph of the PR description:

Building a large HadoopFsRelation requires stat-ing all of its data files. In our environment, where we have tables with 10's of thousands of partitions, the difference between using a cached relation versus a new one is a matter of seconds versus minutes. Caching partitioned table metadata vastly improves the usability of Spark SQL for these cases.


My feeling is that if it is a perf issue and it is not a regression from 1.6, merging to master should be good enough.

For some (like us), I'd say this extends beyond a performance issue into a usability issue. We can't use Spark 2.0 as-is if it takes us several minutes to build a query plan.

@mallman
Copy link
Contributor Author

mallman commented Jul 5, 2016

@zsxwing I was able to do following without error:

git clone [email protected]:apache/spark.git spark-master
cd spark-master
./dev/change-scala-version.sh 2.10
./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package

@zsxwing
Copy link
Member

zsxwing commented Jul 5, 2016

@mallman never mind. 5b7a177 fixed the issue.

asfgit pushed a commit that referenced this pull request Jul 6, 2016
This PR backports your fix (#13818) to branch 2.0.

This PR addresses [SPARK-15968](https://issues.apache.org/jira/browse/SPARK-15968).

## What changes were proposed in this pull request?

The `getCached` method of [HiveMetastoreCatalog](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala) computes `pathsInMetastore` from the metastore relation's catalog table. This only returns the table base path, which is incomplete/inaccurate for a nonempty partitioned table. As a result, cached lookups on nonempty partitioned tables always miss.

Rather than get `pathsInMetastore` from

    metastoreRelation.catalogTable.storage.locationUri.toSeq

I modified the `getCached` method to take a `pathsInMetastore` argument. Calls to this method pass in the paths computed from calls to the Hive metastore. This is how `getCached` was implemented in Spark 1.5:

https://github.com/apache/spark/blob/e0c3212a9b42e3e704b070da4ac25b68c584427f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L444.

I also added a call in `InsertIntoHiveTable.scala` to invalidate the table from the SQL session catalog.

## How was this patch tested?

I've added a new unit test to `parquetSuites.scala`:

    SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached relation

Note that the only difference between this new test and the one above it in the file is that the new test populates its partitioned table with a single value, while the existing test leaves the table empty. This reveals a subtle, unexpected hole in test coverage present before this patch.

Note I also modified a different but related unit test in `parquetSuites.scala`:

    SPARK-15248: explicitly added partitions should be readable

This unit test asserts that Spark SQL should return data from a table partition which has been placed there outside a metastore query immediately after it is added. I changed the test so that, instead of adding the data as a parquet file saved in the partition's location, the data is added through a SQL `INSERT` query. I made this change because I could find no way to efficiently support partitioned table caching without failing that test.

In addition to my primary motivation, I can offer a few reasons I believe this is an acceptable weakening of that test. First, it still validates a fix for [SPARK-15248](https://issues.apache.org/jira/browse/SPARK-15248), the issue for which it was written. Second, the assertion made is stronger than that required for non-partitioned tables. If you write data to the storage location of a non-partitioned metastore table without using a proper SQL DML query, a subsequent call to show that data will not return it. I believe this is an intentional limitation put in place to make table caching feasible, but I'm only speculating.

Building a large `HadoopFsRelation` requires `stat`-ing all of its data files. In our environment, where we have tables with 10's of thousands of partitions, the difference between using a cached relation versus a new one is a matter of seconds versus minutes. Caching partitioned table metadata vastly improves the usability of Spark SQL for these cases.

Author: Reynold Xin <[email protected]>
Author: Michael Allman <[email protected]>

Closes #14064 from yhuai/spark-15968-branch-2.0.
@mallman mallman deleted the spark-15968 branch August 17, 2016 16:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants