From 05f9228ef09c0a7e20d74ea4711cb922ba10ebf0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 10 May 2019 12:58:49 -0700 Subject: [PATCH 01/12] Only ignore FileNotFoundException when spark.sql.files.ignoreMissingFiles=true (default false) --- .../datasources/InMemoryFileIndex.scala | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index db55a06c10822..46faae0ca15dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -168,10 +168,14 @@ object InMemoryFileIndex extends Logging { filter: PathFilter, sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { + val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles + // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { return paths.map { path => - (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) + val leafFiles = listLeafFiles( + path, hadoopConf, filter, Some(sparkSession), ignoreMissingFiles = ignoreMissingFiles) + (path, leafFiles) } } @@ -204,7 +208,9 @@ object InMemoryFileIndex extends Logging { .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => - (path, listLeafFiles(path, hadoopConf, filter, None)) + val leafFiles = listLeafFiles( + path, hadoopConf, filter, None, ignoreMissingFiles = ignoreMissingFiles) + (path, leafFiles) }.iterator }.map { case (path, statuses) => val serializableStatuses = statuses.map { status => @@ -267,15 +273,15 @@ object InMemoryFileIndex extends Logging { path: Path, hadoopConf: Configuration, filter: PathFilter, - sessionOpt: Option[SparkSession]): Seq[FileStatus] = { + sessionOpt: Option[SparkSession], + ignoreMissingFiles: Boolean): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) - // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. val statuses = try fs.listStatus(path) catch { - case _: FileNotFoundException => + case _: FileNotFoundException if ignoreMissingFiles => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] } @@ -288,7 +294,14 @@ object InMemoryFileIndex extends Logging { case Some(session) => bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) case _ => - dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) + dirs.flatMap { dir => + listLeafFiles( + dir.getPath, + hadoopConf, + filter, + sessionOpt, + ignoreMissingFiles = ignoreMissingFiles) + } } val allFiles = topLevelFiles ++ nestedFiles if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles @@ -330,7 +343,7 @@ object InMemoryFileIndex extends Logging { } Some(lfs) } catch { - case _: FileNotFoundException => + case _: FileNotFoundException if ignoreMissingFiles => missingFiles += f.getPath.toString None } From a31c08a188a1b5529cde05c985c7efa273269af2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 May 2019 13:24:18 -0700 Subject: [PATCH 02/12] Update test cases to reflect behavior change --- .../datasources/FileIndexSuite.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index bc83f3d39d9d8..06df0ce411a2c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources -import java.io.File +import java.io.{File, FileNotFoundException} import java.net.URI import scala.collection.mutable @@ -167,14 +167,25 @@ class FileIndexSuite extends SharedSQLContext { } } - test("InMemoryFileIndex: folders that don't exist don't throw exceptions") { + test("InMemoryFileIndex: respects ignoreMissingFiles config") { withTempDir { dir => val deletedFolder = new File(dir, "deleted") assert(!deletedFolder.exists()) - val catalog1 = new InMemoryFileIndex( + + def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex( spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) - // doesn't throw an exception - assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty) + + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") { + intercept[FileNotFoundException] { + makeCatalog() + } + } + + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") { + val catalog = makeCatalog() + // doesn't throw an exception + assert(catalog.listLeafFiles(catalog.rootPaths).isEmpty) + } } } From 69f8db62e0caf5946e2338be8fa60443a041f5bb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 May 2019 17:05:52 -0700 Subject: [PATCH 03/12] Only non-root deletions should respect flag. --- .../sql/execution/command/CommandUtils.scala | 2 +- .../datasources/InMemoryFileIndex.scala | 43 ++++++++-- .../datasources/FileIndexSuite.scala | 84 ++++++++++++++++++- 3 files changed, 118 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 70e7cd9a1e40d..ed893c61e887d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -69,7 +69,7 @@ object CommandUtils extends Logging { } } val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles( - paths, sessionState.newHadoopConf(), pathFilter, spark) + paths, sessionState.newHadoopConf(), pathFilter, spark, areRootPaths = true) fileStatusSeq.flatMap(_._2.map(_.getLen)).sum } else { partitions.map { p => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 46faae0ca15dd..4be3b4c6d9ffc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -124,7 +124,7 @@ class InMemoryFileIndex( } val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( - pathsToFetch, hadoopConf, filter, sparkSession) + pathsToFetch, hadoopConf, filter, sparkSession, areRootPaths = true) discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) @@ -166,7 +166,8 @@ object InMemoryFileIndex extends Logging { paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, - sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { + sparkSession: SparkSession, + areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles @@ -174,7 +175,12 @@ object InMemoryFileIndex extends Logging { if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { return paths.map { path => val leafFiles = listLeafFiles( - path, hadoopConf, filter, Some(sparkSession), ignoreMissingFiles = ignoreMissingFiles) + path, + hadoopConf, + filter, + Some(sparkSession), + ignoreMissingFiles = ignoreMissingFiles, + isRootPath = areRootPaths) (path, leafFiles) } } @@ -209,7 +215,12 @@ object InMemoryFileIndex extends Logging { val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => val leafFiles = listLeafFiles( - path, hadoopConf, filter, None, ignoreMissingFiles = ignoreMissingFiles) + path, + hadoopConf, + filter, + None, + ignoreMissingFiles = ignoreMissingFiles, + isRootPath = areRootPaths) (path, leafFiles) }.iterator }.map { case (path, statuses) => @@ -274,14 +285,23 @@ object InMemoryFileIndex extends Logging { hadoopConf: Configuration, filter: PathFilter, sessionOpt: Option[SparkSession], - ignoreMissingFiles: Boolean): Seq[FileStatus] = { + ignoreMissingFiles: Boolean, + isRootPath: Boolean): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. val statuses = try fs.listStatus(path) catch { - case _: FileNotFoundException if ignoreMissingFiles => + // If we are listing a root path (e.g. the top level directory of a table), ignore + // missing files. This is necessary in order to be able to drop SessionCatalog tables + // when the table's root directory has been deleted (see discussion at SPARK-27676). + + // If we are NOT listing a root path then a FileNotFoundException here means that the + // directory was present in a previous round of file listing but is absent in this + // listing, likely indicating a race condition (e.g. concurrent table overwrite or S3 + // list inconsistency). + case _: FileNotFoundException if isRootPath || ignoreMissingFiles => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] } @@ -292,7 +312,13 @@ object InMemoryFileIndex extends Logging { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = sessionOpt match { case Some(session) => - bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) + bulkListLeafFiles( + dirs.map(_.getPath), + hadoopConf, + filter, + session, + areRootPaths = false + ).flatMap(_._2) case _ => dirs.flatMap { dir => listLeafFiles( @@ -300,7 +326,8 @@ object InMemoryFileIndex extends Logging { hadoopConf, filter, sessionOpt, - ignoreMissingFiles = ignoreMissingFiles) + ignoreMissingFiles = ignoreMissingFiles, + isRootPath = false) } } val allFiles = topLevelFiles ++ nestedFiles diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 06df0ce411a2c..28d457da07441 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -167,13 +167,21 @@ class FileIndexSuite extends SharedSQLContext { } } - test("InMemoryFileIndex: respects ignoreMissingFiles config") { + test("InMemoryFileIndex: root folders that don't exist don't throw exceptions") { withTempDir { dir => val deletedFolder = new File(dir, "deleted") assert(!deletedFolder.exists()) + val catalog1 = new InMemoryFileIndex( + spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) + // doesn't throw an exception + assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty) + } + } + test("SPARK-27676: InMemoryFileIndex respects ignoreMissingFiles config for non-root paths") { + def doTest(): Unit = { def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex( - spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) + spark, Seq(DeletionRaceFileSystem.rootFolderPath), Map.empty, None) withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") { intercept[FileNotFoundException] { @@ -187,6 +195,22 @@ class FileIndexSuite extends SharedSQLContext { assert(catalog.listLeafFiles(catalog.rootPaths).isEmpty) } } + + withClue("test missing subdirectories") { + withSQLConf( + "fs.mockFs.impl" -> classOf[SubdirectoryDeletionRaceFileSystem].getName, + "fs.mockFs.impl.disable.cache" -> "true") { + doTest() + } + } + + withClue("test missing leaf files") { + withSQLConf( + "fs.mockFs.impl" -> classOf[FileDeletionRaceFileSystem].getName, + "fs.mockFs.impl.disable.cache" -> "true") { + doTest() + } + } } test("PartitioningAwareFileIndex listing parallelized with many top level dirs") { @@ -367,6 +391,62 @@ class FileIndexSuite extends SharedSQLContext { } +object DeletionRaceFileSystem { + val rootFolderPath: Path = new Path("mockFs:///rootFolder/") + val subFolderPath: Path = new Path(rootFolderPath, "subFolder") + val leafFilePath: Path = new Path(subFolderPath, "leafFile") + val rootListing: Array[FileStatus] = + Array(new FileStatus(0, true, 0, 0, 0, subFolderPath)) + val subFolderListing: Array[FileStatus] = + Array(new FileStatus(0, false, 0, 100, 0, leafFilePath)) +} + +// Used in SPARK-27676 test to simulate a race where a subdirectory is deleted +// between back-to-back listing calls. +class SubdirectoryDeletionRaceFileSystem extends RawLocalFileSystem { + import DeletionRaceFileSystem._ + + override def getScheme: String = "mockFs" + + override def listStatus(path: Path): Array[FileStatus] = { + if (path == rootFolderPath) { + println("LISTED ROOT") + rootListing + } else if (path == subFolderPath) { + println("LISTED SUBFOLDER") + Thread.dumpStack() + throw new FileNotFoundException() + } else { + throw new IllegalArgumentException() + } + } +} + +// Used in SPARK-27676 test to simulate a race where a file is deleted between +// being listed and having its size / file status checked. +class FileDeletionRaceFileSystem extends RawLocalFileSystem { + import DeletionRaceFileSystem._ + + override def getScheme: String = "mockFs" + + override def listStatus(path: Path): Array[FileStatus] = { + if (path == rootFolderPath) { + rootListing + } else if (path == subFolderPath) { + subFolderListing + } else { + throw new IllegalArgumentException() + } + } + + override def getFileBlockLocations( + file: FileStatus, + start: Long, + len: Long): Array[BlockLocation] = { + throw new FileNotFoundException() + } +} + class FakeParentPathFileSystem extends RawLocalFileSystem { override def getScheme: String = "mockFs" From 24ad834e262bdb5453379937400e76f0bbec4d85 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 May 2019 18:40:24 -0700 Subject: [PATCH 04/12] Remove debug code --- .../spark/sql/execution/datasources/FileIndexSuite.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 28d457da07441..4670448066f05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -410,11 +410,8 @@ class SubdirectoryDeletionRaceFileSystem extends RawLocalFileSystem { override def listStatus(path: Path): Array[FileStatus] = { if (path == rootFolderPath) { - println("LISTED ROOT") rootListing } else if (path == subFolderPath) { - println("LISTED SUBFOLDER") - Thread.dumpStack() throw new FileNotFoundException() } else { throw new IllegalArgumentException() From 400a02bb813ce706aa41985ba812b1526dacf326 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 May 2019 19:02:16 -0700 Subject: [PATCH 05/12] Fix indentation --- .../spark/sql/execution/datasources/FileIndexSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 4670448066f05..66e7e178b9fbe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -198,7 +198,7 @@ class FileIndexSuite extends SharedSQLContext { withClue("test missing subdirectories") { withSQLConf( - "fs.mockFs.impl" -> classOf[SubdirectoryDeletionRaceFileSystem].getName, + "fs.mockFs.impl" -> classOf[SubdirectoryDeletionRaceFileSystem].getName, "fs.mockFs.impl.disable.cache" -> "true") { doTest() } @@ -206,7 +206,7 @@ class FileIndexSuite extends SharedSQLContext { withClue("test missing leaf files") { withSQLConf( - "fs.mockFs.impl" -> classOf[FileDeletionRaceFileSystem].getName, + "fs.mockFs.impl" -> classOf[FileDeletionRaceFileSystem].getName, "fs.mockFs.impl.disable.cache" -> "true") { doTest() } From 0c1eba3ff813586973e6bfbb661f9cbe6686d5a0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 May 2019 19:30:11 -0700 Subject: [PATCH 06/12] Add note to migration guide --- docs/sql-migration-guide-upgrade.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 1aea26208694f..2b67091dcd8db 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -130,6 +130,8 @@ license: | - Since Spark 3.0, Spark will cast `String` to `Date/TimeStamp` in binary comparisons with dates/timestamps. The previous behaviour of casting `Date/Timestamp` to `String` can be restored by setting `spark.sql.legacy.typeCoercion.datetimeToString` to `true`. + - Since Spark 3.0, if files or subdirectories disappear during recursive directory listing (i.e. they appear in an intermediate listing but then cannot be read or listed further, due to either concurrent deletions or object store consistency issues) then the listing will fail with a `FileNotFoundException` unless `spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous versions, these `FileNotFoundException`s would be logged as warnings and the missing files or subdirectories would be ignored. + ## Upgrading from Spark SQL 2.4 to 2.4.1 - The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was From 88dc6b62b2b9360590e441733d6f52ce47d3ac7a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 May 2019 19:54:47 -0700 Subject: [PATCH 07/12] Test with parallel partition discovery --- .../datasources/FileIndexSuite.scala | 69 +++++++++++-------- 1 file changed, 40 insertions(+), 29 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 66e7e178b9fbe..eeadd4ed998da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem} +import org.apache.spark.SparkException import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util._ @@ -179,37 +180,47 @@ class FileIndexSuite extends SharedSQLContext { } test("SPARK-27676: InMemoryFileIndex respects ignoreMissingFiles config for non-root paths") { - def doTest(): Unit = { - def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex( - spark, Seq(DeletionRaceFileSystem.rootFolderPath), Map.empty, None) - - withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") { - intercept[FileNotFoundException] { - makeCatalog() + for ( + raceCondition <- Seq( + classOf[SubdirectoryDeletionRaceFileSystem], + classOf[FileDeletionRaceFileSystem] + ); + ignoreMissingFiles <- Seq(true, false); + parDiscoveryThreshold <- Seq(0, 100) + ) { + withClue(s"raceCondition=$raceCondition, ignoreMissingFiles=$ignoreMissingFiles, " + + s"parDiscoveryThreshold=$parDiscoveryThreshold" + ) { + withSQLConf( + SQLConf.IGNORE_MISSING_FILES.key -> ignoreMissingFiles.toString, + SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> parDiscoveryThreshold.toString, + "fs.mockFs.impl" -> classOf[SubdirectoryDeletionRaceFileSystem].getName, + "fs.mockFs.impl.disable.cache" -> "true" + ) { + def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex( + spark, Seq(DeletionRaceFileSystem.rootFolderPath), Map.empty, None) + if (ignoreMissingFiles) { + // We're ignoring missing files, so catalog construction should succeed + val catalog = makeCatalog() + // The catalog should list no files because the only file or subdirectory was missing + assert(catalog.listLeafFiles(catalog.rootPaths).isEmpty) + } else { + // We're NOT ignoring missing files, so catalog construction should fail + val e = intercept[Exception] { + makeCatalog() + } + // The exact exception depends on whether we're using parallel listing + if (parDiscoveryThreshold == 0) { + // The FileNotFoundException occurs in a Spark executor (as part of a job) + assert(e.isInstanceOf[SparkException]) + assert(e.getMessage.contains("FileNotFoundException")) + } else { + // The FileNotFoundException occurs directly on the driver + assert(e.isInstanceOf[FileNotFoundException]) + } + } } } - - withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") { - val catalog = makeCatalog() - // doesn't throw an exception - assert(catalog.listLeafFiles(catalog.rootPaths).isEmpty) - } - } - - withClue("test missing subdirectories") { - withSQLConf( - "fs.mockFs.impl" -> classOf[SubdirectoryDeletionRaceFileSystem].getName, - "fs.mockFs.impl.disable.cache" -> "true") { - doTest() - } - } - - withClue("test missing leaf files") { - withSQLConf( - "fs.mockFs.impl" -> classOf[FileDeletionRaceFileSystem].getName, - "fs.mockFs.impl.disable.cache" -> "true") { - doTest() - } } } From 86c3a9da87c71b7618eaa6e61a82c529cae84bfb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 May 2019 20:02:33 -0700 Subject: [PATCH 08/12] Clarify migration guide comment --- docs/sql-migration-guide-upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 2b67091dcd8db..c5a06646bee4c 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -130,7 +130,7 @@ license: | - Since Spark 3.0, Spark will cast `String` to `Date/TimeStamp` in binary comparisons with dates/timestamps. The previous behaviour of casting `Date/Timestamp` to `String` can be restored by setting `spark.sql.legacy.typeCoercion.datetimeToString` to `true`. - - Since Spark 3.0, if files or subdirectories disappear during recursive directory listing (i.e. they appear in an intermediate listing but then cannot be read or listed further, due to either concurrent deletions or object store consistency issues) then the listing will fail with a `FileNotFoundException` unless `spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous versions, these `FileNotFoundException`s would be logged as warnings and the missing files or subdirectories would be ignored. + - Since Spark 3.0, if files or subdirectories disappear during recursive directory listing (i.e. they appear in an intermediate listing but then cannot be read or listed during later phases of the recursive directory listing, due to either concurrent file deletions or object store consistency issues) then the listing will fail with an exception unless `spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous versions, these missing files or subdirectories would be ignored. Note that this change of behavior only applies during initial table file listing (or during `REFRESH TABLE`), not during query execution: the net change is that `spark.sql.files.ignoreMissingFiles` is now obeyed during table file listing / query planning, not only at query execution time. ## Upgrading from Spark SQL 2.4 to 2.4.1 From 42e8b9881d2654150a92bbc7fc341904f4862900 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 May 2019 23:55:08 -0700 Subject: [PATCH 09/12] Strengthen test assertions further (to fix bug in tests and guard against case discussed during #24672 review) --- .../datasources/FileIndexSuite.scala | 53 +++++++++++++------ 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index eeadd4ed998da..2a5c5a2dd0ff8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -180,6 +180,7 @@ class FileIndexSuite extends SharedSQLContext { } test("SPARK-27676: InMemoryFileIndex respects ignoreMissingFiles config for non-root paths") { + import DeletionRaceFileSystem._ for ( raceCondition <- Seq( classOf[SubdirectoryDeletionRaceFileSystem], @@ -194,16 +195,24 @@ class FileIndexSuite extends SharedSQLContext { withSQLConf( SQLConf.IGNORE_MISSING_FILES.key -> ignoreMissingFiles.toString, SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> parDiscoveryThreshold.toString, - "fs.mockFs.impl" -> classOf[SubdirectoryDeletionRaceFileSystem].getName, + "fs.mockFs.impl" -> raceCondition.getName, "fs.mockFs.impl.disable.cache" -> "true" ) { def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex( - spark, Seq(DeletionRaceFileSystem.rootFolderPath), Map.empty, None) + spark, Seq(rootDirPath), Map.empty, None) if (ignoreMissingFiles) { // We're ignoring missing files, so catalog construction should succeed val catalog = makeCatalog() - // The catalog should list no files because the only file or subdirectory was missing - assert(catalog.listLeafFiles(catalog.rootPaths).isEmpty) + val leafFiles = catalog.listLeafFiles(catalog.rootPaths) + if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) { + // The only subdirectory was missing, so there should be no leaf files: + assert(leafFiles.isEmpty) + } else { + assert(raceCondition == classOf[FileDeletionRaceFileSystem]) + // One of the two leaf files was missing, but we should still list the other: + assert(leafFiles.size == 1) + assert(leafFiles.head.getPath == nonDeletedLeafFilePath) + } } else { // We're NOT ignoring missing files, so catalog construction should fail val e = intercept[Exception] { @@ -217,6 +226,13 @@ class FileIndexSuite extends SharedSQLContext { } else { // The FileNotFoundException occurs directly on the driver assert(e.isInstanceOf[FileNotFoundException]) + // Test that the FileNotFoundException is triggered for the expected reason: + if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) { + assert(e.getMessage.contains(subDirPath.toString)) + } else { + assert(raceCondition == classOf[FileDeletionRaceFileSystem]) + assert(e.getMessage.contains(leafFilePath.toString)) + } } } } @@ -403,13 +419,16 @@ class FileIndexSuite extends SharedSQLContext { } object DeletionRaceFileSystem { - val rootFolderPath: Path = new Path("mockFs:///rootFolder/") - val subFolderPath: Path = new Path(rootFolderPath, "subFolder") - val leafFilePath: Path = new Path(subFolderPath, "leafFile") + val rootDirPath: Path = new Path("mockFs:///rootDir/") + val subDirPath: Path = new Path(rootDirPath, "subDir") + val leafFilePath: Path = new Path(subDirPath, "leafFile") + val nonDeletedLeafFilePath: Path = new Path(subDirPath, "nonDeletedLeafFile") val rootListing: Array[FileStatus] = - Array(new FileStatus(0, true, 0, 0, 0, subFolderPath)) + Array(new FileStatus(0, true, 0, 0, 0, subDirPath)) val subFolderListing: Array[FileStatus] = - Array(new FileStatus(0, false, 0, 100, 0, leafFilePath)) + Array( + new FileStatus(0, false, 0, 100, 0, leafFilePath), + new FileStatus(0, false, 0, 100, 0, nonDeletedLeafFilePath)) } // Used in SPARK-27676 test to simulate a race where a subdirectory is deleted @@ -420,10 +439,10 @@ class SubdirectoryDeletionRaceFileSystem extends RawLocalFileSystem { override def getScheme: String = "mockFs" override def listStatus(path: Path): Array[FileStatus] = { - if (path == rootFolderPath) { + if (path == rootDirPath) { rootListing - } else if (path == subFolderPath) { - throw new FileNotFoundException() + } else if (path == subDirPath) { + throw new FileNotFoundException(subDirPath.toString) } else { throw new IllegalArgumentException() } @@ -438,9 +457,9 @@ class FileDeletionRaceFileSystem extends RawLocalFileSystem { override def getScheme: String = "mockFs" override def listStatus(path: Path): Array[FileStatus] = { - if (path == rootFolderPath) { + if (path == rootDirPath) { rootListing - } else if (path == subFolderPath) { + } else if (path == subDirPath) { subFolderListing } else { throw new IllegalArgumentException() @@ -451,7 +470,11 @@ class FileDeletionRaceFileSystem extends RawLocalFileSystem { file: FileStatus, start: Long, len: Long): Array[BlockLocation] = { - throw new FileNotFoundException() + if (file.getPath == leafFilePath) { + throw new FileNotFoundException(leafFilePath.toString) + } else { + Array.empty + } } } From 2a6240b761f942d4c79bcf1935ebd89ce04d2232 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 May 2019 19:17:17 -0700 Subject: [PATCH 10/12] Work in progress towards fixing races for root file deletion --- .../sql/execution/command/CommandUtils.scala | 7 ++- .../datasources/InMemoryFileIndex.scala | 61 ++++++++++++------- .../datasources/FileIndexSuite.scala | 19 ++++-- .../datasources/FileSourceStrategySuite.scala | 2 +- 4 files changed, 61 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index ed893c61e887d..e535155c0a4a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -69,7 +69,12 @@ object CommandUtils extends Logging { } } val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles( - paths, sessionState.newHadoopConf(), pathFilter, spark, areRootPaths = true) + paths, + sessionState.newHadoopConf(), + pathFilter, + spark, + isRefresh = true, + isNestedListing = false) fileStatusSeq.flatMap(_._2.map(_.getLen)).sum } else { partitions.map { p => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 4be3b4c6d9ffc..f1d41457317f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -64,7 +64,7 @@ class InMemoryFileIndex( @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ @volatile private var cachedPartitionSpec: PartitionSpec = _ - refresh0() + buildIndex(isRefresh = false) override def partitionSpec(): PartitionSpec = { if (cachedPartitionSpec == null) { @@ -84,11 +84,11 @@ class InMemoryFileIndex( override def refresh(): Unit = { fileStatusCache.invalidateAll() - refresh0() + buildIndex(isRefresh = true) } - private def refresh0(): Unit = { - val files = listLeafFiles(rootPaths) + private def buildIndex(isRefresh: Boolean): Unit = { + val files = listLeafFiles(rootPaths, isRefresh = isRefresh) cachedLeafFiles = new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) @@ -108,8 +108,12 @@ class InMemoryFileIndex( * discovery threshold. * * This is publicly visible for testing. + * + * @param paths the root paths to list + * @param isRefresh true if we are listing due to an explicit refresh(), + * false if we are listing in the constructor. */ - def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + def listLeafFiles(paths: Seq[Path], isRefresh: Boolean): mutable.LinkedHashSet[FileStatus] = { val output = mutable.LinkedHashSet[FileStatus]() val pathsToFetch = mutable.ArrayBuffer[Path]() for (path <- paths) { @@ -124,7 +128,12 @@ class InMemoryFileIndex( } val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( - pathsToFetch, hadoopConf, filter, sparkSession, areRootPaths = true) + pathsToFetch, + hadoopConf, + filter, + sparkSession, + isRefresh = isRefresh, + isNestedListing = false) // since we are starting from a root path discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) @@ -167,7 +176,8 @@ object InMemoryFileIndex extends Logging { hadoopConf: Configuration, filter: PathFilter, sparkSession: SparkSession, - areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { + isRefresh: Boolean, + isNestedListing: Boolean): Seq[(Path, Seq[FileStatus])] = { val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles @@ -180,7 +190,8 @@ object InMemoryFileIndex extends Logging { filter, Some(sparkSession), ignoreMissingFiles = ignoreMissingFiles, - isRootPath = areRootPaths) + isRefresh = isRefresh, + isNestedListing = isNestedListing) (path, leafFiles) } } @@ -220,7 +231,8 @@ object InMemoryFileIndex extends Logging { filter, None, ignoreMissingFiles = ignoreMissingFiles, - isRootPath = areRootPaths) + isRefresh = isRefresh, + isNestedListing = isNestedListing) (path, leafFiles) }.iterator }.map { case (path, statuses) => @@ -286,22 +298,27 @@ object InMemoryFileIndex extends Logging { filter: PathFilter, sessionOpt: Option[SparkSession], ignoreMissingFiles: Boolean, - isRootPath: Boolean): Seq[FileStatus] = { + isRefresh: Boolean, + isNestedListing: Boolean): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. val statuses = try fs.listStatus(path) catch { - // If we are listing a root path (e.g. the top level directory of a table), ignore - // missing files. This is necessary in order to be able to drop SessionCatalog tables - // when the table's root directory has been deleted (see discussion at SPARK-27676). - - // If we are NOT listing a root path then a FileNotFoundException here means that the - // directory was present in a previous round of file listing but is absent in this - // listing, likely indicating a race condition (e.g. concurrent table overwrite or S3 - // list inconsistency). - case _: FileNotFoundException if isRootPath || ignoreMissingFiles => + // If we are building this index for the first time (in the InMemoryFileIndex constructor, + // where isRefresh = false) then presumably every path passed to the constructor must + // have existed, so any FileNotFoundExceptions likely indicate a serious race condition. + + // However, if we're rebuilding the index due to an explicit refresh() then we need to + // ignore deleted root directories in order to preserve existing behavior and to allow + // SessionCatalog tables to be dropped when root directories have been deleted; see + // discussion on SPARK-27676's PR for more details. + + // If we are NOT listing a root path (isNestedListing = true) then a FileNotFoundException + // here means that the directory was present in a previous round of file listing but is + // absent in this listing, indicating a race condition). + case _: FileNotFoundException if (isRefresh && !isNestedListing) || ignoreMissingFiles => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] } @@ -317,7 +334,8 @@ object InMemoryFileIndex extends Logging { hadoopConf, filter, session, - areRootPaths = false + isRefresh = isRefresh, + isNestedListing = true ).flatMap(_._2) case _ => dirs.flatMap { dir => @@ -327,7 +345,8 @@ object InMemoryFileIndex extends Logging { filter, sessionOpt, ignoreMissingFiles = ignoreMissingFiles, - isRootPath = false) + isRefresh = isRefresh, + isNestedListing = true) } } val allFiles = topLevelFiles ++ nestedFiles diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 2a5c5a2dd0ff8..28557a58d1985 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -168,14 +168,23 @@ class FileIndexSuite extends SharedSQLContext { } } - test("InMemoryFileIndex: root folders that don't exist don't throw exceptions") { + test("SPARK-27676: InMemoryFileIndex: root path that don't exist don't throw exceptions") { withTempDir { dir => val deletedFolder = new File(dir, "deleted") - assert(!deletedFolder.exists()) - val catalog1 = new InMemoryFileIndex( + // If a root path doesn't exist when InMemoryFileIndex is being constructed then + // index construction should fail + intercept[FileNotFoundException] { + new InMemoryFileIndex(spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) + } + // If a root folder exists at InMemoryFileIndex construction time and then is subsequently + // deleted then refresh() should still work + deletedFolder.mkdir() + val catalog = new InMemoryFileIndex( spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) + deletedFolder.delete() + catalog.refresh() // doesn't throw an exception - assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty) + assert(catalog.listLeafFiles(catalog.rootPaths, isRefresh = true).isEmpty) } } @@ -203,7 +212,7 @@ class FileIndexSuite extends SharedSQLContext { if (ignoreMissingFiles) { // We're ignoring missing files, so catalog construction should succeed val catalog = makeCatalog() - val leafFiles = catalog.listLeafFiles(catalog.rootPaths) + val leafFiles = catalog.listLeafFiles(catalog.rootPaths, isRefresh = false) if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) { // The only subdirectory was missing, so there should be no leaf files: assert(leafFiles.isEmpty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index b38f0f7f228a4..aa66bc8dece92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -403,7 +403,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi parameters = Map.empty[String, String], userSpecifiedSchema = None) // This should not fail. - fileCatalog.listLeafFiles(Seq(new Path(tempDir))) + fileCatalog.listLeafFiles(Seq(new Path(tempDir)), isRefresh = false) // Also have an integration test. checkAnswer( From 97bac912fdfc3ce68c61b90f65f2e8857a920496 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 May 2019 19:20:01 -0700 Subject: [PATCH 11/12] Revert "Work in progress towards fixing races for root file deletion" This reverts commit 2a6240b761f942d4c79bcf1935ebd89ce04d2232. --- .../sql/execution/command/CommandUtils.scala | 7 +-- .../datasources/InMemoryFileIndex.scala | 61 +++++++------------ .../datasources/FileIndexSuite.scala | 19 ++---- .../datasources/FileSourceStrategySuite.scala | 2 +- 4 files changed, 28 insertions(+), 61 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index e535155c0a4a5..ed893c61e887d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -69,12 +69,7 @@ object CommandUtils extends Logging { } } val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles( - paths, - sessionState.newHadoopConf(), - pathFilter, - spark, - isRefresh = true, - isNestedListing = false) + paths, sessionState.newHadoopConf(), pathFilter, spark, areRootPaths = true) fileStatusSeq.flatMap(_._2.map(_.getLen)).sum } else { partitions.map { p => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index f1d41457317f9..4be3b4c6d9ffc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -64,7 +64,7 @@ class InMemoryFileIndex( @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ @volatile private var cachedPartitionSpec: PartitionSpec = _ - buildIndex(isRefresh = false) + refresh0() override def partitionSpec(): PartitionSpec = { if (cachedPartitionSpec == null) { @@ -84,11 +84,11 @@ class InMemoryFileIndex( override def refresh(): Unit = { fileStatusCache.invalidateAll() - buildIndex(isRefresh = true) + refresh0() } - private def buildIndex(isRefresh: Boolean): Unit = { - val files = listLeafFiles(rootPaths, isRefresh = isRefresh) + private def refresh0(): Unit = { + val files = listLeafFiles(rootPaths) cachedLeafFiles = new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) @@ -108,12 +108,8 @@ class InMemoryFileIndex( * discovery threshold. * * This is publicly visible for testing. - * - * @param paths the root paths to list - * @param isRefresh true if we are listing due to an explicit refresh(), - * false if we are listing in the constructor. */ - def listLeafFiles(paths: Seq[Path], isRefresh: Boolean): mutable.LinkedHashSet[FileStatus] = { + def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { val output = mutable.LinkedHashSet[FileStatus]() val pathsToFetch = mutable.ArrayBuffer[Path]() for (path <- paths) { @@ -128,12 +124,7 @@ class InMemoryFileIndex( } val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( - pathsToFetch, - hadoopConf, - filter, - sparkSession, - isRefresh = isRefresh, - isNestedListing = false) // since we are starting from a root path + pathsToFetch, hadoopConf, filter, sparkSession, areRootPaths = true) discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) @@ -176,8 +167,7 @@ object InMemoryFileIndex extends Logging { hadoopConf: Configuration, filter: PathFilter, sparkSession: SparkSession, - isRefresh: Boolean, - isNestedListing: Boolean): Seq[(Path, Seq[FileStatus])] = { + areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles @@ -190,8 +180,7 @@ object InMemoryFileIndex extends Logging { filter, Some(sparkSession), ignoreMissingFiles = ignoreMissingFiles, - isRefresh = isRefresh, - isNestedListing = isNestedListing) + isRootPath = areRootPaths) (path, leafFiles) } } @@ -231,8 +220,7 @@ object InMemoryFileIndex extends Logging { filter, None, ignoreMissingFiles = ignoreMissingFiles, - isRefresh = isRefresh, - isNestedListing = isNestedListing) + isRootPath = areRootPaths) (path, leafFiles) }.iterator }.map { case (path, statuses) => @@ -298,27 +286,22 @@ object InMemoryFileIndex extends Logging { filter: PathFilter, sessionOpt: Option[SparkSession], ignoreMissingFiles: Boolean, - isRefresh: Boolean, - isNestedListing: Boolean): Seq[FileStatus] = { + isRootPath: Boolean): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. val statuses = try fs.listStatus(path) catch { - // If we are building this index for the first time (in the InMemoryFileIndex constructor, - // where isRefresh = false) then presumably every path passed to the constructor must - // have existed, so any FileNotFoundExceptions likely indicate a serious race condition. - - // However, if we're rebuilding the index due to an explicit refresh() then we need to - // ignore deleted root directories in order to preserve existing behavior and to allow - // SessionCatalog tables to be dropped when root directories have been deleted; see - // discussion on SPARK-27676's PR for more details. - - // If we are NOT listing a root path (isNestedListing = true) then a FileNotFoundException - // here means that the directory was present in a previous round of file listing but is - // absent in this listing, indicating a race condition). - case _: FileNotFoundException if (isRefresh && !isNestedListing) || ignoreMissingFiles => + // If we are listing a root path (e.g. the top level directory of a table), ignore + // missing files. This is necessary in order to be able to drop SessionCatalog tables + // when the table's root directory has been deleted (see discussion at SPARK-27676). + + // If we are NOT listing a root path then a FileNotFoundException here means that the + // directory was present in a previous round of file listing but is absent in this + // listing, likely indicating a race condition (e.g. concurrent table overwrite or S3 + // list inconsistency). + case _: FileNotFoundException if isRootPath || ignoreMissingFiles => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] } @@ -334,8 +317,7 @@ object InMemoryFileIndex extends Logging { hadoopConf, filter, session, - isRefresh = isRefresh, - isNestedListing = true + areRootPaths = false ).flatMap(_._2) case _ => dirs.flatMap { dir => @@ -345,8 +327,7 @@ object InMemoryFileIndex extends Logging { filter, sessionOpt, ignoreMissingFiles = ignoreMissingFiles, - isRefresh = isRefresh, - isNestedListing = true) + isRootPath = false) } } val allFiles = topLevelFiles ++ nestedFiles diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 28557a58d1985..2a5c5a2dd0ff8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -168,23 +168,14 @@ class FileIndexSuite extends SharedSQLContext { } } - test("SPARK-27676: InMemoryFileIndex: root path that don't exist don't throw exceptions") { + test("InMemoryFileIndex: root folders that don't exist don't throw exceptions") { withTempDir { dir => val deletedFolder = new File(dir, "deleted") - // If a root path doesn't exist when InMemoryFileIndex is being constructed then - // index construction should fail - intercept[FileNotFoundException] { - new InMemoryFileIndex(spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) - } - // If a root folder exists at InMemoryFileIndex construction time and then is subsequently - // deleted then refresh() should still work - deletedFolder.mkdir() - val catalog = new InMemoryFileIndex( + assert(!deletedFolder.exists()) + val catalog1 = new InMemoryFileIndex( spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) - deletedFolder.delete() - catalog.refresh() // doesn't throw an exception - assert(catalog.listLeafFiles(catalog.rootPaths, isRefresh = true).isEmpty) + assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty) } } @@ -212,7 +203,7 @@ class FileIndexSuite extends SharedSQLContext { if (ignoreMissingFiles) { // We're ignoring missing files, so catalog construction should succeed val catalog = makeCatalog() - val leafFiles = catalog.listLeafFiles(catalog.rootPaths, isRefresh = false) + val leafFiles = catalog.listLeafFiles(catalog.rootPaths) if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) { // The only subdirectory was missing, so there should be no leaf files: assert(leafFiles.isEmpty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index aa66bc8dece92..b38f0f7f228a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -403,7 +403,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi parameters = Map.empty[String, String], userSpecifiedSchema = None) // This should not fail. - fileCatalog.listLeafFiles(Seq(new Path(tempDir)), isRefresh = false) + fileCatalog.listLeafFiles(Seq(new Path(tempDir))) // Also have an integration test. checkAnswer( From 58e95445579a17d5d457dfc7678893d481f0b6f7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 May 2019 19:33:42 -0700 Subject: [PATCH 12/12] Update comment to clarify exception for root paths --- .../datasources/InMemoryFileIndex.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 4be3b4c6d9ffc..fe89bf2c77324 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -293,14 +293,25 @@ object InMemoryFileIndex extends Logging { // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. val statuses = try fs.listStatus(path) catch { - // If we are listing a root path (e.g. the top level directory of a table), ignore - // missing files. This is necessary in order to be able to drop SessionCatalog tables - // when the table's root directory has been deleted (see discussion at SPARK-27676). - + // If we are listing a root path (e.g. a top level directory of a table), we need to + // ignore FileNotFoundExceptions during this root level of the listing because + // + // (a) certain code paths might construct an InMemoryFileIndex with root paths that + // might not exist (i.e. not all callers are guaranteed to have checked + // path existence prior to constructing InMemoryFileIndex) and, + // (b) we need to ignore deleted root paths during REFRESH TABLE, otherwise we break + // existing behavior and break the ability drop SessionCatalog tables when tables' + // root directories have been deleted (which breaks a number of Spark's own tests). + // // If we are NOT listing a root path then a FileNotFoundException here means that the - // directory was present in a previous round of file listing but is absent in this + // directory was present in a previous level of file listing but is absent in this // listing, likely indicating a race condition (e.g. concurrent table overwrite or S3 // list inconsistency). + // + // The trade-off in supporting existing behaviors / use-cases is that we won't be + // able to detect race conditions involving root paths being deleted during + // InMemoryFileIndex construction. However, it's still a net improvement to detect and + // fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion. case _: FileNotFoundException if isRootPath || ignoreMissingFiles => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus]