Skip to content

Commit b463e6d

Browse files
lianchengyhuai
authored andcommitted
[SPARK-7868] [SQL] Ignores _temporary directories in HadoopFsRelation
So that potential partial/corrupted data files left by failed tasks/jobs won't affect normal data scan. Author: Cheng Lian <[email protected]> Closes #6411 from liancheng/spark-7868 and squashes the following commits: 273ea36 [Cheng Lian] Ignores _temporary directories
1 parent 0c33c7b commit b463e6d

File tree

2 files changed

+29
-7
lines changed

2 files changed

+29
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.SerializableWritable
3131
import org.apache.spark.sql.{Row, _}
3232
import org.apache.spark.sql.catalyst.expressions._
3333
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
34-
import org.apache.spark.sql.types.{StructField, StructType}
34+
import org.apache.spark.sql.types.StructType
3535

3636
/**
3737
* ::DeveloperApi::
@@ -378,24 +378,30 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
378378
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
379379

380380
def refresh(): Unit = {
381+
// We don't filter files/directories whose name start with "_" or "." here, as specific data
382+
// sources may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
383+
// But "_temporary" directories are explicitly ignored since failed tasks/jobs may leave
384+
// partial/corrupted data files there.
381385
def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
382-
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
383-
val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
384-
files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
386+
if (status.getPath.getName.toLowerCase == "_temporary") {
387+
Set.empty
388+
} else {
389+
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
390+
val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
391+
files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
392+
}
385393
}
386394

387395
leafFiles.clear()
388396

389-
// We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
390-
// may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
391397
val statuses = paths.flatMap { path =>
392398
val hdfsPath = new Path(path)
393399
val fs = hdfsPath.getFileSystem(hadoopConf)
394400
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
395401
Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
396402
}
397403

398-
val (dirs, files) = statuses.partition(_.isDir)
404+
val files = statuses.filterNot(_.isDir)
399405
leafFiles ++= files.map(f => f.getPath -> f).toMap
400406
leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
401407
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,4 +548,20 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
548548
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
549549
}
550550
}
551+
552+
test("SPARK-7868: _temporary directories should be ignored") {
553+
withTempPath { dir =>
554+
val df = Seq("a", "b", "c").zipWithIndex.toDF()
555+
556+
df.write
557+
.format("parquet")
558+
.save(dir.getCanonicalPath)
559+
560+
df.write
561+
.format("parquet")
562+
.save(s"${dir.getCanonicalPath}/_temporary")
563+
564+
checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect())
565+
}
566+
}
551567
}

0 commit comments

Comments
 (0)