From be5522d24a39da20fc8d08b13e15def8e9139cc3 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Mon, 31 Aug 2015 22:01:18 -0700 Subject: [PATCH] SPARK-10304: throw error when the table directory is invalid --- .../datasources/PartitioningUtils.scala | 16 +++++++++++++--- .../apache/spark/sql/sources/interfaces.scala | 3 ++- .../parquet/ParquetPartitionDiscoverySuite.scala | 12 ++++++++++++ 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 0a2007e15843..891ab6735a17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -49,6 +49,10 @@ private[sql] object PartitioningUtils { require(columnNames.size == literals.size) } + private[sql] object PartitionValues { + val empty = PartitionValues(Seq.empty, Seq.empty) + } + /** * Given a group of qualified paths, tries to parse them and returns a partition specification. * For example, given: @@ -81,7 +85,14 @@ private[sql] object PartitioningUtils { parsePartition(path, defaultPartitionName, typeInference).map(path -> _) } - if (pathsWithPartitionValues.isEmpty) { + val ep = pathsWithPartitionValues.filter(_._2 == PartitionValues.empty) + // Make sure all data are either partitioned or non-partitioned. + // The consistency detail will be validated by resolvePartitions + assert(ep.length == 0 || ep.length == pathsWithPartitionValues.length, + s"Conflicting directory structures detected with ${ep.length} non-partitioned files " + + s"and ${pathsWithPartitionValues.length - ep.length} partitioned files") + + if (pathsWithPartitionValues.isEmpty || ep.length == pathsWithPartitionValues.length) { // This dataset is not partitioned. PartitionSpec.emptySpec } else { @@ -146,9 +157,8 @@ private[sql] object PartitioningUtils { chopped = chopped.getParent finished = maybeColumn.isEmpty || chopped.getParent == null } - if (columns.isEmpty) { - None + Some(PartitionValues.empty) } else { val (columnNames, values) = columns.reverse.unzip Some(PartitionValues(columnNames, values)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 7b030b7d73bd..a7e1f2c9286e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -436,7 +436,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio Try(fs.listStatus(qualified)).getOrElse(Array.empty) }.filterNot { status => val name = status.getPath.getName - name.toLowerCase == "_temporary" || name.startsWith(".") + // Is it safe to replace "_temporary" to "_"? + name.startsWith(".") || name.startsWith("_") } val (dirs, files) = statuses.partition(_.isDir) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index ed8bafb10c60..7ab49dea206d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -103,6 +103,18 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha checkThrows[AssertionError]("file://path/a=", "Empty partition column value") } + test("parse invalid partitioned directories") { + val paths = Seq( + "hdfs://host:9000/invalidPath", + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/a=10.5/b=hello") + + val exception = intercept[Exception] { + parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + } + assert(exception.getMessage().contains("Conflicting directory structures detected")) + } + test("parse partitions") { def check(paths: Seq[String], spec: PartitionSpec): Unit = { assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec)