-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-15248][SQL] Make MetastoreFileCatalog consider directories from partition specs of a partitioned metastore table #13022
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #58224 has finished for PR 13022 at commit
|
|
@yhuai Please take a look. |
| /** Get the list of non-overalapping paths to list files in the for a metastore table */ | ||
| def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = { | ||
| val basePathStr = tableBasePath.toUri.toString | ||
| val partitionsOutsideBasePath = partitionSpec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yin this is the hacky thing. I want to make sure that the seq of paths does not contains both
tableBasePath as well as tableBasePath/partition=1. Having both parent dir and sub dir might cause the listing to do duplicate work.
Option1: Use substring match to eliminate partition paths that are anyways covered in the base path. This is what I have done.
Option 2: (just occurred to me, I think this is cleaner)
if (partitionSpec.partitions.nonEmpty) {
partitionSpec.partitions.map(_.path)
} else {
Seq(tableBasePath)
}
Which do you think is better?
Note that the list of path determines the match with cached HadoopFsRelation objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent some time analyzing your problem and I agree with your second option. Why am I included in this discussion anyways? I never contributed to Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry. @tdas was trying to cc me.
|
Test build #58284 has finished for PR 13022 at commit
|
| * @param tableBasePath The default base path of the Hive metastore table | ||
| * @param partitionSpec The partition specifications from Hive metastore | ||
| */ | ||
| private[hive] class MetaStoreFileCatalog( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yin I am inclined to rename this class to MetastorePartitionedTableFileCatalog as this is only used for partitioned tables, and therefore does not need to worry about what files to return when the tables is not partitioned.
|
Test build #58297 has finished for PR 13022 at commit
|
|
|
||
| case None => | ||
| // Directory does not exist, or has not children files | ||
| Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the stacktrace in the description, how did we create the path of file:/Users/tdas/Projects/Spark/spark2/target/tmp/spark-b39ad224-c5d1-4966-8981-fb45a2066d61/partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran the test in this PR without the changes in source.
What is your concern?
|
LGTM. Merging to master and branch 2.0 |
…m partition specs of a partitioned metastore table Table partitions can be added with locations different from default warehouse location of a hive table. `CREATE TABLE parquetTable (a int) PARTITIONED BY (b int) STORED AS parquet ` `ALTER TABLE parquetTable ADD PARTITION (b=1) LOCATION '/partition'` Querying such a table throws error as the MetastoreFileCatalog does not list the added partition directory, it only lists the default base location. ``` [info] - SPARK-15248: explicitly added partitions should be readable *** FAILED *** (1 second, 8 milliseconds) [info] java.util.NoSuchElementException: key not found: file:/Users/tdas/Projects/Spark/spark2/target/tmp/spark-b39ad224-c5d1-4966-8981-fb45a2066d61/partition [info] at scala.collection.MapLike$class.default(MapLike.scala:228) [info] at scala.collection.AbstractMap.default(Map.scala:59) [info] at scala.collection.MapLike$class.apply(MapLike.scala:141) [info] at scala.collection.AbstractMap.apply(Map.scala:59) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:59) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:55) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) [info] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) [info] at scala.collection.AbstractTraversable.map(Traversable.scala:104) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog.listFiles(PartitioningAwareFileCatalog.scala:55) [info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:93) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) [info] at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:55) [info] at org.apache.spark.sql.execution.SparkStrategies$SpecialLimits$.apply(SparkStrategies.scala:55) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) [info] at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60) [info] at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:77) [info] at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) [info] at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:82) [info] at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:82) [info] at org.apache.spark.sql.QueryTest.assertEmptyMissingInput(QueryTest.scala:330) [info] at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:146) [info] at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:159) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:554) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:535) [info] at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:125) [info] at org.apache.spark.sql.hive.ParquetPartitioningTest.withTempDir(parquetSuites.scala:726) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7.apply$mcV$sp(parquetSuites.scala:535) [info] at org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:166) [info] at org.apache.spark.sql.hive.ParquetPartitioningTest.withTable(parquetSuites.scala:726) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply$mcV$sp(parquetSuites.scala:534) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply(parquetSuites.scala:534) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply(parquetSuites.scala:534) ``` The solution in this PR to get the paths to list from the partition spec and not rely on the default table path alone. unit tests. Author: Tathagata Das <[email protected]> Closes #13022 from tdas/SPARK-15248. (cherry picked from commit 81c68ec) Signed-off-by: Yin Huai <[email protected]>
What changes were proposed in this pull request?
Table partitions can be added with locations different from default warehouse location of a hive table.
CREATE TABLE parquetTable (a int) PARTITIONED BY (b int) STORED AS parquetALTER TABLE parquetTable ADD PARTITION (b=1) LOCATION '/path/1'Querying such a table throws error as the MetastoreFileCatalog does not list the added partition directory, it only lists the default base location.
The solution in this PR to get the paths to list from the partition spec and not rely on the default table path alone.
How was this patch tested?
unit tests.