Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ private[sql] object PartitioningUtils {
require(columnNames.size == literals.size)
}

private[sql] object PartitionValues {
val empty = PartitionValues(Seq.empty, Seq.empty)
}

/**
* Given a group of qualified paths, tries to parse them and returns a partition specification.
* For example, given:
Expand Down Expand Up @@ -81,7 +85,14 @@ private[sql] object PartitioningUtils {
parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
}

if (pathsWithPartitionValues.isEmpty) {
val ep = pathsWithPartitionValues.filter(_._2 == PartitionValues.empty)
// Make sure all data are either partitioned or non-partitioned.
// The consistency detail will be validated by resolvePartitions
assert(ep.length == 0 || ep.length == pathsWithPartitionValues.length,
s"Conflicting directory structures detected with ${ep.length} non-partitioned files " +
s"and ${pathsWithPartitionValues.length - ep.length} partitioned files")

if (pathsWithPartitionValues.isEmpty || ep.length == pathsWithPartitionValues.length) {
// This dataset is not partitioned.
PartitionSpec.emptySpec
} else {
Expand Down Expand Up @@ -146,9 +157,8 @@ private[sql] object PartitioningUtils {
chopped = chopped.getParent
finished = maybeColumn.isEmpty || chopped.getParent == null
}

if (columns.isEmpty) {
None
Some(PartitionValues.empty)
} else {
val (columnNames, values) = columns.reverse.unzip
Some(PartitionValues(columnNames, values))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
Try(fs.listStatus(qualified)).getOrElse(Array.empty)
}.filterNot { status =>
val name = status.getPath.getName
name.toLowerCase == "_temporary" || name.startsWith(".")
// Is it safe to replace "_temporary" to "_"?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We need to preserve files like Parquet summary files (_metadata and _common_metadata).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. It seems there is a lot of corner cases to be covered from the test case. for example 1st is valid, but 2nd is not:
1st:
"hdfs://host:9000/path/_temporary",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/_temporary/path",
2nd:
"hdfs://host:9000/path/_temporary",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/path1",
Adding an PartitionValues.empty does not solve the problem. Will close this PR, and investigate other approaches.

name.startsWith(".") || name.startsWith("_")
}

val (dirs, files) = statuses.partition(_.isDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
checkThrows[AssertionError]("file://path/a=", "Empty partition column value")
}

test("parse invalid partitioned directories") {
val paths = Seq(
"hdfs://host:9000/invalidPath",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/a=10.5/b=hello")

val exception = intercept[Exception] {
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
}

test("parse partitions") {
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec)
Expand Down