From 2d13156c32002f65acc1bfccf09e08ef0063aa73 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 19 Sep 2015 21:01:01 +0800 Subject: [PATCH 1/4] Detect invalid partition directory. --- .../datasources/PartitioningUtils.scala | 39 ++++++++++++++----- .../ParquetPartitionDiscoverySuite.scala | 36 ++++++++++++++++- 2 files changed, 64 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 0a2007e15843..57e01c05b5a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -77,8 +77,16 @@ private[sql] object PartitioningUtils { defaultPartitionName: String, typeInference: Boolean): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. - val pathsWithPartitionValues = paths.flatMap { path => - parsePartition(path, defaultPartitionName, typeInference).map(path -> _) + val partitionValuesWithBasePaths = paths.map { path => + path -> parsePartition(path, defaultPartitionName, typeInference) + } + + val pathsWithPartitionValues = partitionValuesWithBasePaths.flatMap { pathAndPart => + pathAndPart._2._1.map(part => pathAndPart._1 -> part) + } + + val basePaths = partitionValuesWithBasePaths.flatMap { pathAndPart => + pathAndPart._2._2 } if (pathsWithPartitionValues.isEmpty) { @@ -87,7 +95,7 @@ private[sql] object PartitioningUtils { } else { // This dataset is partitioned. We need to check whether all partitions have the same // partition columns and resolve potential type conflicts. - val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, basePaths) // Creates the StructType which represents the partition columns. val fields = { @@ -128,30 +136,32 @@ private[sql] object PartitioningUtils { private[sql] def parsePartition( path: Path, defaultPartitionName: String, - typeInference: Boolean): Option[PartitionValues] = { + typeInference: Boolean): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null var chopped = path + var basePath = path while (!finished) { // Sometimes (e.g., when speculative task is enabled), temporary directories may be left // uncleaned. Here we simply ignore them. if (chopped.getName.toLowerCase == "_temporary") { - return None + return (None, None) } val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference) maybeColumn.foreach(columns += _) + basePath = chopped chopped = chopped.getParent - finished = maybeColumn.isEmpty || chopped.getParent == null + finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null } if (columns.isEmpty) { - None + (None, Some(path)) } else { val (columnNames, values) = columns.reverse.unzip - Some(PartitionValues(columnNames, values)) + (Some(PartitionValues(columnNames, values)), Some(basePath)) } } @@ -184,7 +194,8 @@ private[sql] object PartitioningUtils { * }}} */ private[sql] def resolvePartitions( - pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = { + pathsWithPartitionValues: Seq[(Path, PartitionValues)], + basePaths: Seq[Path]): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { @@ -193,6 +204,10 @@ private[sql] object PartitioningUtils { distinctPartColNames.size == 1, listConflictingPartitionColumns(pathsWithPartitionValues)) + assert( + basePaths.distinct.size == 1, + throwErrorForInvalidPartition(basePaths)) + // Resolves possible type conflicts for each column val values = pathsWithPartitionValues.map(_._2) val columnCount = values.head.columnNames.size @@ -207,6 +222,12 @@ private[sql] object PartitioningUtils { } } + private[sql] def throwErrorForInvalidPartition( + basePaths: Seq[Path]): String = { + "Conflicting directory structures detected. Suspicious paths:\b" + + basePaths.mkString("\n\t", "\n\t", "\n\n") + } + private[sql] def listConflictingPartitionColumns( pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = { val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 7bac8609e1b9..6536e92336bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -58,14 +58,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha check(defaultPartitionName, Literal.create(null, NullType)) } + test("parse invalid partitioned directories") { + // Invalid + var paths = Seq( + "hdfs://host:9000/invalidPath", + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/a=10.5/b=hello") + + var exception = intercept[AssertionError] { + parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + } + assert(exception.getMessage().contains("Conflicting directory structures detected")) + + // Valid + paths = Seq( + "hdfs://host:9000/path/_temporary", + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/_temporary/path") + + parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + + // Invalid + paths = Seq( + "hdfs://host:9000/path/_temporary", + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/path1") + + exception = intercept[AssertionError] { + parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + } + assert(exception.getMessage().contains("Conflicting directory structures detected")) + } + test("parse partition") { def check(path: String, expected: Option[PartitionValues]): Unit = { - assert(expected === parsePartition(new Path(path), defaultPartitionName, true)) + assert(expected === parsePartition(new Path(path), defaultPartitionName, true)._1) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), defaultPartitionName, true).get + parsePartition(new Path(path), defaultPartitionName, true) }.getMessage assert(message.contains(expected)) From 779fbd264092e8b4ff0ab5472d34db2d01a971f3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 24 Oct 2015 19:42:34 +0800 Subject: [PATCH 2/4] For comment. --- .../datasources/PartitioningUtils.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 57e01c05b5a4..ae1effe3055f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -95,6 +95,12 @@ private[sql] object PartitioningUtils { } else { // This dataset is partitioned. We need to check whether all partitions have the same // partition columns and resolve potential type conflicts. + + assert( + basePaths.distinct.size == 1, + "Conflicting directory structures detected. Suspicious paths:\b" + + basePaths.mkString("\n\t", "\n\t", "\n\n")) + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, basePaths) // Creates the StructType which represents the partition columns. @@ -204,10 +210,6 @@ private[sql] object PartitioningUtils { distinctPartColNames.size == 1, listConflictingPartitionColumns(pathsWithPartitionValues)) - assert( - basePaths.distinct.size == 1, - throwErrorForInvalidPartition(basePaths)) - // Resolves possible type conflicts for each column val values = pathsWithPartitionValues.map(_._2) val columnCount = values.head.columnNames.size @@ -222,12 +224,6 @@ private[sql] object PartitioningUtils { } } - private[sql] def throwErrorForInvalidPartition( - basePaths: Seq[Path]): String = { - "Conflicting directory structures detected. Suspicious paths:\b" + - basePaths.mkString("\n\t", "\n\t", "\n\n") - } - private[sql] def listConflictingPartitionColumns( pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = { val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct From cdf6dc424abba99a7fd091fca5ce2af56255f69a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 25 Oct 2015 16:12:19 +0800 Subject: [PATCH 3/4] Update function description. --- .../sql/execution/datasources/PartitioningUtils.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index ae1effe3055f..5ef100ffde8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -124,12 +124,12 @@ private[sql] object PartitioningUtils { } /** - * Parses a single partition, returns column names and values of each partition column. For - * example, given: + * Parses a single partition, returns column names and values of each partition column, also + * the base path. For example, given: * {{{ * path = hdfs://:/path/to/partition/a=42/b=hello/c=3.14 * }}} - * it returns: + * it returns the partition: * {{{ * PartitionValues( * Seq("a", "b", "c"), @@ -138,6 +138,10 @@ private[sql] object PartitioningUtils { * Literal.create("hello", StringType), * Literal.create(3.14, FloatType))) * }}} + * and the base path: + * {{{ + * /path/to/partition + * }}} */ private[sql] def parsePartition( path: Path, From 3db42268b7948102278427f73d48e3ebb3196924 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 3 Nov 2015 14:38:42 +0800 Subject: [PATCH 4/4] For comments. --- .../datasources/PartitioningUtils.scala | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 5ef100ffde8d..628c5e18936c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -77,17 +77,11 @@ private[sql] object PartitioningUtils { defaultPartitionName: String, typeInference: Boolean): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. - val partitionValuesWithBasePaths = paths.map { path => - path -> parsePartition(path, defaultPartitionName, typeInference) - } + val (partitionValues, optBasePaths) = paths.map { path => + parsePartition(path, defaultPartitionName, typeInference) + }.unzip - val pathsWithPartitionValues = partitionValuesWithBasePaths.flatMap { pathAndPart => - pathAndPart._2._1.map(part => pathAndPart._1 -> part) - } - - val basePaths = partitionValuesWithBasePaths.flatMap { pathAndPart => - pathAndPart._2._2 - } + val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _)) if (pathsWithPartitionValues.isEmpty) { // This dataset is not partitioned. @@ -95,13 +89,13 @@ private[sql] object PartitioningUtils { } else { // This dataset is partitioned. We need to check whether all partitions have the same // partition columns and resolve potential type conflicts. - + val basePaths = optBasePaths.flatMap(x => x) assert( basePaths.distinct.size == 1, "Conflicting directory structures detected. Suspicious paths:\b" + basePaths.mkString("\n\t", "\n\t", "\n\n")) - val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, basePaths) + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) // Creates the StructType which represents the partition columns. val fields = { @@ -204,8 +198,7 @@ private[sql] object PartitioningUtils { * }}} */ private[sql] def resolvePartitions( - pathsWithPartitionValues: Seq[(Path, PartitionValues)], - basePaths: Seq[Path]): Seq[PartitionValues] = { + pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else {