Skip to content

Commit 91ade3f

Browse files
committed
Add test and more comments.
1 parent 2692bdb commit 91ade3f

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ private[sql] object PartitioningUtils {
8181
parsePartition(path, defaultPartitionName, typeInference)
8282
}.unzip
8383

84+
// We create pairs of (path -> path's partition value) here
85+
// If the corresponding partition value is None, the pair will be skiped
8486
val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _))
8587

8688
if (pathsWithPartitionValues.isEmpty) {
@@ -89,11 +91,21 @@ private[sql] object PartitioningUtils {
8991
} else {
9092
// This dataset is partitioned. We need to check whether all partitions have the same
9193
// partition columns and resolve potential type conflicts.
94+
95+
// Check if there is conflicting directory structure.
96+
// For the paths such as:
97+
// var paths = Seq(
98+
// "hdfs://host:9000/invalidPath",
99+
// "hdfs://host:9000/path/a=10/b=20",
100+
// "hdfs://host:9000/path/a=10.5/b=hello")
101+
// It will be recognised as conflicting directory structure:
102+
// "hdfs://host:9000/invalidPath"
103+
// "hdfs://host:9000/path"
92104
val basePaths = optBasePaths.flatMap(x => x)
93105
assert(
94106
basePaths.distinct.size == 1,
95107
"Conflicting directory structures detected. Suspicious paths:\b" +
96-
basePaths.mkString("\n\t", "\n\t", "\n\n"))
108+
basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
97109

98110
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
99111

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,22 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
8888
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
8989
}
9090
assert(exception.getMessage().contains("Conflicting directory structures detected"))
91+
92+
// Invalid
93+
// Conflicting directory structure:
94+
// "hdfs://host:9000/tmp/tables/partitionedTable"
95+
// "hdfs://host:9000/tmp/tables/nonPartitionedTable1"
96+
// "hdfs://host:9000/tmp/tables/nonPartitionedTable2"
97+
paths = Seq(
98+
"hdfs://host:9000/tmp/tables/partitionedTable",
99+
"hdfs://host:9000/tmp/tables/partitionedTable/p=1/",
100+
"hdfs://host:9000/tmp/tables/nonPartitionedTable1",
101+
"hdfs://host:9000/tmp/tables/nonPartitionedTable2")
102+
103+
exception = intercept[AssertionError] {
104+
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
105+
}
106+
assert(exception.getMessage().contains("Conflicting directory structures detected"))
91107
}
92108

93109
test("parse partition") {

0 commit comments

Comments
 (0)