Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,24 @@ private[sql] object PartitioningUtils {
defaultPartitionName: String,
typeInference: Boolean): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values.
val pathsWithPartitionValues = paths.flatMap { path =>
parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
}
val (partitionValues, optBasePaths) = paths.map { path =>
parsePartition(path, defaultPartitionName, typeInference)
}.unzip

val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it is not very obvious what we are doing at here. Maybe a comment can help.


if (pathsWithPartitionValues.isEmpty) {
// This dataset is not partitioned.
PartitionSpec.emptySpec
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
val basePaths = optBasePaths.flatMap(x => x)
assert(
basePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
basePaths.mkString("\n\t", "\n\t", "\n\n"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give a case that we will hit this branch? What will basePaths be at here?


val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)

// Creates the StructType which represents the partition columns.
Expand All @@ -110,12 +118,12 @@ private[sql] object PartitioningUtils {
}

/**
* Parses a single partition, returns column names and values of each partition column. For
* example, given:
* Parses a single partition, returns column names and values of each partition column, also
* the base path. For example, given:
* {{{
* path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
* }}}
* it returns:
* it returns the partition:
* {{{
* PartitionValues(
* Seq("a", "b", "c"),
Expand All @@ -124,34 +132,40 @@ private[sql] object PartitioningUtils {
* Literal.create("hello", StringType),
* Literal.create(3.14, FloatType)))
* }}}
* and the base path:
* {{{
* /path/to/partition
* }}}
*/
private[sql] def parsePartition(
path: Path,
defaultPartitionName: String,
typeInference: Boolean): Option[PartitionValues] = {
typeInference: Boolean): (Option[PartitionValues], Option[Path]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it's possible to make the return type as Option[(PartitionValues, Path)], and can we simply ignore the path whose Column / Value is empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, you need to update this function description also for its return type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or change the case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) as case class PartitionValues(path: String, columnNames: Seq[String], literals: Seq[Literal])?

Then the code probably much simple and readable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it is possible that we only have Path without corresponding PartitionValues, i.e., (None, Some(path)). So we can't just make it as Option[(PartitionValues, Path)].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A base path is not always associated with a PartitionValues. If there is no partition, we can still have a base path.

That is why I don't make case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) to something like case class PartitionValues(path: String, columnNames: Seq[String], literals: Seq[Literal]).

val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
var chopped = path
var basePath = path

while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
// uncleaned. Here we simply ignore them.
if (chopped.getName.toLowerCase == "_temporary") {
return None
return (None, None)
}

val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
maybeColumn.foreach(columns += _)
basePath = chopped
chopped = chopped.getParent
finished = maybeColumn.isEmpty || chopped.getParent == null
finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need !columns.isEmpty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see. It is for something like table/a=1/_temporary/something, right?

}

if (columns.isEmpty) {
None
(None, Some(path))
} else {
val (columnNames, values) = columns.reverse.unzip
Some(PartitionValues(columnNames, values))
(Some(PartitionValues(columnNames, values)), Some(basePath))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
check(defaultPartitionName, Literal.create(null, NullType))
}

test("parse invalid partitioned directories") {
// Invalid
var paths = Seq(
"hdfs://host:9000/invalidPath",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/a=10.5/b=hello")

var exception = intercept[AssertionError] {
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))

// Valid
paths = Seq(
"hdfs://host:9000/path/_temporary",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/_temporary/path")

parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)

// Invalid
paths = Seq(
"hdfs://host:9000/path/_temporary",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/path1")

exception = intercept[AssertionError] {
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which case is for the one I mentioned in the jira?


test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = {
assert(expected === parsePartition(new Path(path), defaultPartitionName, true))
assert(expected === parsePartition(new Path(path), defaultPartitionName, true)._1)
}

def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] {
parsePartition(new Path(path), defaultPartitionName, true).get
parsePartition(new Path(path), defaultPartitionName, true)
}.getMessage

assert(message.contains(expected))
Expand Down