diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index b062a0436e435..c920f0641cad7 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -145,6 +145,8 @@ license: | - Since Spark 3.0, a higher-order function `exists` follows the three-valued boolean logic, i.e., if the `predicate` returns any `null`s and no `true` is obtained, then `exists` will return `null` instead of `false`. For example, `exists(array(1, null, 3), x -> x % 2 == 0)` will be `null`. The previous behaviour can be restored by setting `spark.sql.legacy.arrayExistsFollowsThreeValuedLogic` to `false`. + - Since Spark 3.0, if files or subdirectories disappear during recursive directory listing (i.e. they appear in an intermediate listing but then cannot be read or listed during later phases of the recursive directory listing, due to either concurrent file deletions or object store consistency issues) then the listing will fail with an exception unless `spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous versions, these missing files or subdirectories would be ignored. Note that this change of behavior only applies during initial table file listing (or during `REFRESH TABLE`), not during query execution: the net change is that `spark.sql.files.ignoreMissingFiles` is now obeyed during table file listing / query planning, not only at query execution time. + ## Upgrading from Spark SQL 2.4 to 2.4.1 - The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index cac2519614789..b644e6dc471d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -67,7 +67,7 @@ object CommandUtils extends Logging { override def accept(path: Path): Boolean = isDataPath(path, stagingDir) } val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles( - paths, sessionState.newHadoopConf(), pathFilter, spark) + paths, sessionState.newHadoopConf(), pathFilter, spark, areRootPaths = true) fileStatusSeq.flatMap(_._2.map(_.getLen)).sum } else { partitions.map { p => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 45cf924266257..cf7a13050f66c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -125,7 +125,7 @@ class InMemoryFileIndex( } val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( - pathsToFetch, hadoopConf, filter, sparkSession) + pathsToFetch, hadoopConf, filter, sparkSession, areRootPaths = true) discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) @@ -167,12 +167,22 @@ object InMemoryFileIndex extends Logging { paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, - sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { + sparkSession: SparkSession, + areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { + + val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { return paths.map { path => - (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) + val leafFiles = listLeafFiles( + path, + hadoopConf, + filter, + Some(sparkSession), + ignoreMissingFiles = ignoreMissingFiles, + isRootPath = areRootPaths) + (path, leafFiles) } } @@ -205,7 +215,14 @@ object InMemoryFileIndex extends Logging { .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => - (path, listLeafFiles(path, hadoopConf, filter, None)) + val leafFiles = listLeafFiles( + path, + hadoopConf, + filter, + None, + ignoreMissingFiles = ignoreMissingFiles, + isRootPath = areRootPaths) + (path, leafFiles) }.iterator }.map { case (path, statuses) => val serializableStatuses = statuses.map { status => @@ -268,11 +285,12 @@ object InMemoryFileIndex extends Logging { path: Path, hadoopConf: Configuration, filter: PathFilter, - sessionOpt: Option[SparkSession]): Seq[FileStatus] = { + sessionOpt: Option[SparkSession], + ignoreMissingFiles: Boolean, + isRootPath: Boolean): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) - // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. val statuses: Array[FileStatus] = try { @@ -290,7 +308,26 @@ object InMemoryFileIndex extends Logging { case _ => fs.listStatus(path) } } catch { - case _: FileNotFoundException => + // If we are listing a root path (e.g. a top level directory of a table), we need to + // ignore FileNotFoundExceptions during this root level of the listing because + // + // (a) certain code paths might construct an InMemoryFileIndex with root paths that + // might not exist (i.e. not all callers are guaranteed to have checked + // path existence prior to constructing InMemoryFileIndex) and, + // (b) we need to ignore deleted root paths during REFRESH TABLE, otherwise we break + // existing behavior and break the ability drop SessionCatalog tables when tables' + // root directories have been deleted (which breaks a number of Spark's own tests). + // + // If we are NOT listing a root path then a FileNotFoundException here means that the + // directory was present in a previous level of file listing but is absent in this + // listing, likely indicating a race condition (e.g. concurrent table overwrite or S3 + // list inconsistency). + // + // The trade-off in supporting existing behaviors / use-cases is that we won't be + // able to detect race conditions involving root paths being deleted during + // InMemoryFileIndex construction. However, it's still a net improvement to detect and + // fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion. + case _: FileNotFoundException if isRootPath || ignoreMissingFiles => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] } @@ -301,9 +338,23 @@ object InMemoryFileIndex extends Logging { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = sessionOpt match { case Some(session) => - bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) + bulkListLeafFiles( + dirs.map(_.getPath), + hadoopConf, + filter, + session, + areRootPaths = false + ).flatMap(_._2) case _ => - dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) + dirs.flatMap { dir => + listLeafFiles( + dir.getPath, + hadoopConf, + filter, + sessionOpt, + ignoreMissingFiles = ignoreMissingFiles, + isRootPath = false) + } } val allFiles = topLevelFiles ++ nestedFiles if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles @@ -345,7 +396,7 @@ object InMemoryFileIndex extends Logging { } Some(lfs) } catch { - case _: FileNotFoundException => + case _: FileNotFoundException if ignoreMissingFiles => missingFiles += f.getPath.toString None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index bc83f3d39d9d8..2a5c5a2dd0ff8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.execution.datasources -import java.io.File +import java.io.{File, FileNotFoundException} import java.net.URI import scala.collection.mutable import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem} +import org.apache.spark.SparkException import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util._ @@ -167,7 +168,7 @@ class FileIndexSuite extends SharedSQLContext { } } - test("InMemoryFileIndex: folders that don't exist don't throw exceptions") { + test("InMemoryFileIndex: root folders that don't exist don't throw exceptions") { withTempDir { dir => val deletedFolder = new File(dir, "deleted") assert(!deletedFolder.exists()) @@ -178,6 +179,67 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-27676: InMemoryFileIndex respects ignoreMissingFiles config for non-root paths") { + import DeletionRaceFileSystem._ + for ( + raceCondition <- Seq( + classOf[SubdirectoryDeletionRaceFileSystem], + classOf[FileDeletionRaceFileSystem] + ); + ignoreMissingFiles <- Seq(true, false); + parDiscoveryThreshold <- Seq(0, 100) + ) { + withClue(s"raceCondition=$raceCondition, ignoreMissingFiles=$ignoreMissingFiles, " + + s"parDiscoveryThreshold=$parDiscoveryThreshold" + ) { + withSQLConf( + SQLConf.IGNORE_MISSING_FILES.key -> ignoreMissingFiles.toString, + SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> parDiscoveryThreshold.toString, + "fs.mockFs.impl" -> raceCondition.getName, + "fs.mockFs.impl.disable.cache" -> "true" + ) { + def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex( + spark, Seq(rootDirPath), Map.empty, None) + if (ignoreMissingFiles) { + // We're ignoring missing files, so catalog construction should succeed + val catalog = makeCatalog() + val leafFiles = catalog.listLeafFiles(catalog.rootPaths) + if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) { + // The only subdirectory was missing, so there should be no leaf files: + assert(leafFiles.isEmpty) + } else { + assert(raceCondition == classOf[FileDeletionRaceFileSystem]) + // One of the two leaf files was missing, but we should still list the other: + assert(leafFiles.size == 1) + assert(leafFiles.head.getPath == nonDeletedLeafFilePath) + } + } else { + // We're NOT ignoring missing files, so catalog construction should fail + val e = intercept[Exception] { + makeCatalog() + } + // The exact exception depends on whether we're using parallel listing + if (parDiscoveryThreshold == 0) { + // The FileNotFoundException occurs in a Spark executor (as part of a job) + assert(e.isInstanceOf[SparkException]) + assert(e.getMessage.contains("FileNotFoundException")) + } else { + // The FileNotFoundException occurs directly on the driver + assert(e.isInstanceOf[FileNotFoundException]) + // Test that the FileNotFoundException is triggered for the expected reason: + if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) { + assert(e.getMessage.contains(subDirPath.toString)) + } else { + assert(raceCondition == classOf[FileDeletionRaceFileSystem]) + assert(e.getMessage.contains(leafFilePath.toString)) + } + } + } + } + } + } + } + test("PartitioningAwareFileIndex listing parallelized with many top level dirs") { for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) { withTempDir { dir => @@ -356,6 +418,66 @@ class FileIndexSuite extends SharedSQLContext { } +object DeletionRaceFileSystem { + val rootDirPath: Path = new Path("mockFs:///rootDir/") + val subDirPath: Path = new Path(rootDirPath, "subDir") + val leafFilePath: Path = new Path(subDirPath, "leafFile") + val nonDeletedLeafFilePath: Path = new Path(subDirPath, "nonDeletedLeafFile") + val rootListing: Array[FileStatus] = + Array(new FileStatus(0, true, 0, 0, 0, subDirPath)) + val subFolderListing: Array[FileStatus] = + Array( + new FileStatus(0, false, 0, 100, 0, leafFilePath), + new FileStatus(0, false, 0, 100, 0, nonDeletedLeafFilePath)) +} + +// Used in SPARK-27676 test to simulate a race where a subdirectory is deleted +// between back-to-back listing calls. +class SubdirectoryDeletionRaceFileSystem extends RawLocalFileSystem { + import DeletionRaceFileSystem._ + + override def getScheme: String = "mockFs" + + override def listStatus(path: Path): Array[FileStatus] = { + if (path == rootDirPath) { + rootListing + } else if (path == subDirPath) { + throw new FileNotFoundException(subDirPath.toString) + } else { + throw new IllegalArgumentException() + } + } +} + +// Used in SPARK-27676 test to simulate a race where a file is deleted between +// being listed and having its size / file status checked. +class FileDeletionRaceFileSystem extends RawLocalFileSystem { + import DeletionRaceFileSystem._ + + override def getScheme: String = "mockFs" + + override def listStatus(path: Path): Array[FileStatus] = { + if (path == rootDirPath) { + rootListing + } else if (path == subDirPath) { + subFolderListing + } else { + throw new IllegalArgumentException() + } + } + + override def getFileBlockLocations( + file: FileStatus, + start: Long, + len: Long): Array[BlockLocation] = { + if (file.getPath == leafFilePath) { + throw new FileNotFoundException(leafFilePath.toString) + } else { + Array.empty + } + } +} + class FakeParentPathFileSystem extends RawLocalFileSystem { override def getScheme: String = "mockFs"