Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ abstract class PartitioningAwareFileCatalog(
} else {
prunePartitions(filters, partitionSpec()).map {
case PartitionDirectory(values, path) =>
Partition(
values,
leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_"))
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
existingDir.filterNot(_.getPath.getName.startsWith("_"))

case None =>
// Directory does not exist, or has not children files
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not --> no

Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the stacktrace in the description, how did we create the path of file:/Users/tdas/Projects/Spark/spark2/target/tmp/spark-b39ad224-c5d1-4966-8981-fb45a2066d61/partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the test in this PR without the changes in source.
What is your concern?

}
Partition(values, files)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
Some(partitionSpec))

val hadoopFsRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil
val fileCatalog = new MetaStoreFileCatalog(sparkSession, paths, partitionSpec)
val fileCatalog = new MetaStorePartitionedTableFileCatalog(
sparkSession,
new Path(metastoreRelation.catalogTable.storage.locationUri.get),
partitionSpec)

val inferredSchema = if (fileType.equals("parquet")) {
val inferredSchema =
Expand Down Expand Up @@ -531,18 +533,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
/**
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
* @param tableBasePath The default base path of the Hive metastore table
* @param partitionSpec The partition specifications from Hive metastore
*/
private[hive] class MetaStoreFileCatalog(
private[hive] class MetaStorePartitionedTableFileCatalog(
sparkSession: SparkSession,
paths: Seq[Path],
partitionSpecFromHive: PartitionSpec)
tableBasePath: Path,
override val partitionSpec: PartitionSpec)
extends ListingFileCatalog(
sparkSession,
paths,
MetaStorePartitionedTableFileCatalog.getPaths(tableBasePath, partitionSpec),
Map.empty,
Some(partitionSpecFromHive.partitionColumns)) {
Some(partitionSpec.partitionColumns)) {
}

override def partitionSpec(): PartitionSpec = partitionSpecFromHive
private[hive] object MetaStorePartitionedTableFileCatalog {
/** Get the list of paths to list files in the for a metastore table */
def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = {
// If there are no partitions currently specified then use base path,
// otherwise use the paths corresponding to the partitions.
if (partitionSpec.partitions.isEmpty) {
Seq(tableBasePath)
} else {
partitionSpec.partitions.map(_.path)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,40 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {

dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
}

test("SPARK-15248: explicitly added partitions should be readable") {
withTable("test_added_partitions", "test_temp") {
withTempDir { src =>
val partitionDir = new File(src, "partition").getCanonicalPath
sql(
"""
|CREATE TABLE test_added_partitions (a STRING)
|PARTITIONED BY (b INT)
|STORED AS PARQUET
""".stripMargin)

// Temp table to insert data into partitioned table
Seq("foo", "bar").toDF("a").registerTempTable("test_temp")
sql("INSERT INTO test_added_partitions PARTITION(b='0') SELECT a FROM test_temp")

checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid of using select * in the tests :) It can introduce flakiness.

Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))

// Create partition without data files and check whether it can be read
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))

// Add data files to partition directory and check whether they can be read
Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir)
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
}
}
}
}

/**
Expand Down