Skip to content

Commit d6035d9

Browse files
viiryadavies
authored andcommitted
[SPARK-10304] [SQL] Partition discovery should throw an exception if the dir structure is invalid
JIRA: https://issues.apache.org/jira/browse/SPARK-10304 This patch detects if the structure of partition directories is not valid. The test cases are from #8547. Thanks zhzhan. cc liancheng Author: Liang-Chi Hsieh <[email protected]> Closes #8840 from viirya/detect_invalid_part_dir.
1 parent 57446eb commit d6035d9

File tree

2 files changed

+59
-13
lines changed

2 files changed

+59
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,24 @@ private[sql] object PartitioningUtils {
7777
defaultPartitionName: String,
7878
typeInference: Boolean): PartitionSpec = {
7979
// First, we need to parse every partition's path and see if we can find partition values.
80-
val pathsWithPartitionValues = paths.flatMap { path =>
81-
parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
82-
}
80+
val (partitionValues, optBasePaths) = paths.map { path =>
81+
parsePartition(path, defaultPartitionName, typeInference)
82+
}.unzip
83+
84+
val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _))
8385

8486
if (pathsWithPartitionValues.isEmpty) {
8587
// This dataset is not partitioned.
8688
PartitionSpec.emptySpec
8789
} else {
8890
// This dataset is partitioned. We need to check whether all partitions have the same
8991
// partition columns and resolve potential type conflicts.
92+
val basePaths = optBasePaths.flatMap(x => x)
93+
assert(
94+
basePaths.distinct.size == 1,
95+
"Conflicting directory structures detected. Suspicious paths:\b" +
96+
basePaths.mkString("\n\t", "\n\t", "\n\n"))
97+
9098
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
9199

92100
// Creates the StructType which represents the partition columns.
@@ -110,12 +118,12 @@ private[sql] object PartitioningUtils {
110118
}
111119

112120
/**
113-
* Parses a single partition, returns column names and values of each partition column. For
114-
* example, given:
121+
* Parses a single partition, returns column names and values of each partition column, also
122+
* the base path. For example, given:
115123
* {{{
116124
* path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
117125
* }}}
118-
* it returns:
126+
* it returns the partition:
119127
* {{{
120128
* PartitionValues(
121129
* Seq("a", "b", "c"),
@@ -124,34 +132,40 @@ private[sql] object PartitioningUtils {
124132
* Literal.create("hello", StringType),
125133
* Literal.create(3.14, FloatType)))
126134
* }}}
135+
* and the base path:
136+
* {{{
137+
* /path/to/partition
138+
* }}}
127139
*/
128140
private[sql] def parsePartition(
129141
path: Path,
130142
defaultPartitionName: String,
131-
typeInference: Boolean): Option[PartitionValues] = {
143+
typeInference: Boolean): (Option[PartitionValues], Option[Path]) = {
132144
val columns = ArrayBuffer.empty[(String, Literal)]
133145
// Old Hadoop versions don't have `Path.isRoot`
134146
var finished = path.getParent == null
135147
var chopped = path
148+
var basePath = path
136149

137150
while (!finished) {
138151
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
139152
// uncleaned. Here we simply ignore them.
140153
if (chopped.getName.toLowerCase == "_temporary") {
141-
return None
154+
return (None, None)
142155
}
143156

144157
val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
145158
maybeColumn.foreach(columns += _)
159+
basePath = chopped
146160
chopped = chopped.getParent
147-
finished = maybeColumn.isEmpty || chopped.getParent == null
161+
finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null
148162
}
149163

150164
if (columns.isEmpty) {
151-
None
165+
(None, Some(path))
152166
} else {
153167
val (columnNames, values) = columns.reverse.unzip
154-
Some(PartitionValues(columnNames, values))
168+
(Some(PartitionValues(columnNames, values)), Some(basePath))
155169
}
156170
}
157171

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
5858
check(defaultPartitionName, Literal.create(null, NullType))
5959
}
6060

61+
test("parse invalid partitioned directories") {
62+
// Invalid
63+
var paths = Seq(
64+
"hdfs://host:9000/invalidPath",
65+
"hdfs://host:9000/path/a=10/b=20",
66+
"hdfs://host:9000/path/a=10.5/b=hello")
67+
68+
var exception = intercept[AssertionError] {
69+
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
70+
}
71+
assert(exception.getMessage().contains("Conflicting directory structures detected"))
72+
73+
// Valid
74+
paths = Seq(
75+
"hdfs://host:9000/path/_temporary",
76+
"hdfs://host:9000/path/a=10/b=20",
77+
"hdfs://host:9000/path/_temporary/path")
78+
79+
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
80+
81+
// Invalid
82+
paths = Seq(
83+
"hdfs://host:9000/path/_temporary",
84+
"hdfs://host:9000/path/a=10/b=20",
85+
"hdfs://host:9000/path/path1")
86+
87+
exception = intercept[AssertionError] {
88+
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
89+
}
90+
assert(exception.getMessage().contains("Conflicting directory structures detected"))
91+
}
92+
6193
test("parse partition") {
6294
def check(path: String, expected: Option[PartitionValues]): Unit = {
63-
assert(expected === parsePartition(new Path(path), defaultPartitionName, true))
95+
assert(expected === parsePartition(new Path(path), defaultPartitionName, true)._1)
6496
}
6597

6698
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
6799
val message = intercept[T] {
68-
parsePartition(new Path(path), defaultPartitionName, true).get
100+
parsePartition(new Path(path), defaultPartitionName, true)
69101
}.getMessage
70102

71103
assert(message.contains(expected))

0 commit comments

Comments
 (0)