Skip to content

Commit 79cef81

Browse files
gengliangwangHyukjinKwon
authored andcommitted
[SPARK-34075][SQL][CORE] Hidden directories are being listed for partition inference
### What changes were proposed in this pull request? Fix a regression from #29959. In Spark, the following file paths are considered as hidden paths and they are ignored on file reads: 1. starts with "_" and doesn't contain "=" 2. starts with "." However, after the refactoring PR #29959, the hidden paths are not filtered out on partition inference: https://github.com/apache/spark/pull/29959/files#r556432426 This PR is to fix the bug. To archive the goal, the method `InMemoryFileIndex.shouldFilterOut` is refactored as `HadoopFSUtils.shouldFilterOutPathName` ### Why are the changes needed? Bugfix ### Does this PR introduce _any_ user-facing change? Yes, it fixes a bug for reading file paths with partitions. ### How was this patch tested? Unit test Closes #31169 from gengliangwang/fileListingBug. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: HyukjinKwon <[email protected]> (cherry picked from commit 467d758) Signed-off-by: HyukjinKwon <[email protected]>
1 parent cb6655d commit 79cef81

File tree

5 files changed

+69
-28
lines changed

5 files changed

+69
-28
lines changed

core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,11 @@ private[spark] object HadoopFSUtils extends Logging {
249249
Array.empty[FileStatus]
250250
}
251251

252+
val filteredStatuses =
253+
statuses.filterNot(status => shouldFilterOutPathName(status.getPath.getName))
254+
252255
val allLeafStatuses = {
253-
val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
256+
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
254257
val nestedFiles: Seq[FileStatus] = contextOpt match {
255258
case Some(context) if dirs.size > parallelismThreshold =>
256259
parallelListLeafFilesInternal(
@@ -350,4 +353,18 @@ private[spark] object HadoopFSUtils extends Logging {
350353
modificationTime: Long,
351354
accessTime: Long,
352355
blockLocations: Array[SerializableBlockLocation])
356+
357+
/** Checks if we should filter out this path name. */
358+
def shouldFilterOutPathName(pathName: String): Boolean = {
359+
// We filter follow paths:
360+
// 1. everything that starts with _ and ., except _common_metadata and _metadata
361+
// because Parquet needs to find those metadata files from leaf files returned by this method.
362+
// We should refactor this logic to not mix metadata files with data files.
363+
// 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
364+
// should skip this file in case of double reading.
365+
val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
366+
pathName.startsWith(".") || pathName.endsWith("._COPYING_")
367+
val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
368+
exclude && !include
369+
}
353370
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
import org.apache.spark.SparkFunSuite
21+
22+
class HadoopFSUtilsSuite extends SparkFunSuite {
23+
test("HadoopFSUtils - file filtering") {
24+
assert(!HadoopFSUtils.shouldFilterOutPathName("abcd"))
25+
assert(HadoopFSUtils.shouldFilterOutPathName(".ab"))
26+
assert(HadoopFSUtils.shouldFilterOutPathName("_cd"))
27+
assert(!HadoopFSUtils.shouldFilterOutPathName("_metadata"))
28+
assert(!HadoopFSUtils.shouldFilterOutPathName("_common_metadata"))
29+
assert(HadoopFSUtils.shouldFilterOutPathName("_ab_metadata"))
30+
assert(HadoopFSUtils.shouldFilterOutPathName("_cd_common_metadata"))
31+
assert(HadoopFSUtils.shouldFilterOutPathName("a._COPYING_"))
32+
}
33+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import org.apache.spark.sql.sources._
5050
import org.apache.spark.sql.streaming.OutputMode
5151
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
5252
import org.apache.spark.sql.util.SchemaUtils
53-
import org.apache.spark.util.{ThreadUtils, Utils}
53+
import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils}
5454

5555
/**
5656
* The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
@@ -811,7 +811,7 @@ object DataSource extends Logging {
811811
val allPaths = globbedPaths ++ nonGlobPaths
812812
if (checkFilesExist) {
813813
val (filteredOut, filteredIn) = allPaths.partition { path =>
814-
InMemoryFileIndex.shouldFilterOut(path.getName)
814+
HadoopFSUtils.shouldFilterOutPathName(path.getName)
815815
}
816816
if (filteredIn.isEmpty) {
817817
logWarning(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -158,23 +158,10 @@ object InMemoryFileIndex extends Logging {
158158
parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
159159
}
160160

161-
/** Checks if we should filter out this path name. */
162-
def shouldFilterOut(pathName: String): Boolean = {
163-
// We filter follow paths:
164-
// 1. everything that starts with _ and ., except _common_metadata and _metadata
165-
// because Parquet needs to find those metadata files from leaf files returned by this method.
166-
// We should refactor this logic to not mix metadata files with data files.
167-
// 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
168-
// should skip this file in case of double reading.
169-
val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
170-
pathName.startsWith(".") || pathName.endsWith("._COPYING_")
171-
val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
172-
exclude && !include
173-
}
174161
}
175162

176163
private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable {
177164
override def accept(path: Path): Boolean = {
178-
(filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName)
165+
(filter == null || filter.accept(path)) && !HadoopFSUtils.shouldFilterOutPathName(path.getName)
179166
}
180167
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -297,17 +297,6 @@ class FileIndexSuite extends SharedSparkSession {
297297
}
298298
}
299299

300-
test("InMemoryFileIndex - file filtering") {
301-
assert(!InMemoryFileIndex.shouldFilterOut("abcd"))
302-
assert(InMemoryFileIndex.shouldFilterOut(".ab"))
303-
assert(InMemoryFileIndex.shouldFilterOut("_cd"))
304-
assert(!InMemoryFileIndex.shouldFilterOut("_metadata"))
305-
assert(!InMemoryFileIndex.shouldFilterOut("_common_metadata"))
306-
assert(InMemoryFileIndex.shouldFilterOut("_ab_metadata"))
307-
assert(InMemoryFileIndex.shouldFilterOut("_cd_common_metadata"))
308-
assert(InMemoryFileIndex.shouldFilterOut("a._COPYING_"))
309-
}
310-
311300
test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
312301
class MockCatalog(
313302
override val rootPaths: Seq[Path])
@@ -416,6 +405,21 @@ class FileIndexSuite extends SharedSparkSession {
416405
fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
417406
}
418407

408+
test("SPARK-34075: InMemoryFileIndex filters out hidden file on partition inference") {
409+
withTempPath { path =>
410+
spark
411+
.range(2)
412+
.select(col("id").as("p"), col("id"))
413+
.write
414+
.partitionBy("p")
415+
.parquet(path.getAbsolutePath)
416+
val targetPath = new File(path, "p=1")
417+
val hiddenPath = new File(path, "_hidden_path")
418+
targetPath.renameTo(hiddenPath)
419+
assert(spark.read.parquet(path.getAbsolutePath).count() == 1L)
420+
}
421+
}
422+
419423
test("SPARK-20367 - properly unescape column names in inferPartitioning") {
420424
withTempPath { path =>
421425
val colToUnescape = "Column/#%'?"

0 commit comments

Comments
 (0)