-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27676][SQL][SS] InMemoryFileIndex should respect spark.sql.files.ignoreMissingFiles #24668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
05f9228
a31c08a
69f8db6
24ad834
400a02b
0c1eba3
88dc6b6
86c3a9d
42e8b98
2a6240b
97bac91
58e9544
d9c5903
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,13 +17,14 @@ | |
|
|
||
| package org.apache.spark.sql.execution.datasources | ||
|
|
||
| import java.io.File | ||
| import java.io.{File, FileNotFoundException} | ||
| import java.net.URI | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem} | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.metrics.source.HiveCatalogMetrics | ||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.util._ | ||
|
|
@@ -167,7 +168,7 @@ class FileIndexSuite extends SharedSQLContext { | |
| } | ||
| } | ||
|
|
||
| test("InMemoryFileIndex: folders that don't exist don't throw exceptions") { | ||
| test("InMemoryFileIndex: root folders that don't exist don't throw exceptions") { | ||
| withTempDir { dir => | ||
| val deletedFolder = new File(dir, "deleted") | ||
| assert(!deletedFolder.exists()) | ||
|
|
@@ -178,6 +179,67 @@ class FileIndexSuite extends SharedSQLContext { | |
| } | ||
| } | ||
|
|
||
| test("SPARK-27676: InMemoryFileIndex respects ignoreMissingFiles config for non-root paths") { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is one config There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. In the case of a parallel listing, this would cause the listing Spark job to fail with a In the interests of complete test coverage, I'll update the test case to exercise the parallel listing path, too. (Combinatorial test coverage is hard!) |
||
| import DeletionRaceFileSystem._ | ||
| for ( | ||
| raceCondition <- Seq( | ||
| classOf[SubdirectoryDeletionRaceFileSystem], | ||
| classOf[FileDeletionRaceFileSystem] | ||
| ); | ||
| ignoreMissingFiles <- Seq(true, false); | ||
| parDiscoveryThreshold <- Seq(0, 100) | ||
| ) { | ||
| withClue(s"raceCondition=$raceCondition, ignoreMissingFiles=$ignoreMissingFiles, " + | ||
| s"parDiscoveryThreshold=$parDiscoveryThreshold" | ||
| ) { | ||
| withSQLConf( | ||
| SQLConf.IGNORE_MISSING_FILES.key -> ignoreMissingFiles.toString, | ||
| SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> parDiscoveryThreshold.toString, | ||
| "fs.mockFs.impl" -> raceCondition.getName, | ||
| "fs.mockFs.impl.disable.cache" -> "true" | ||
| ) { | ||
| def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex( | ||
| spark, Seq(rootDirPath), Map.empty, None) | ||
| if (ignoreMissingFiles) { | ||
| // We're ignoring missing files, so catalog construction should succeed | ||
| val catalog = makeCatalog() | ||
| val leafFiles = catalog.listLeafFiles(catalog.rootPaths) | ||
| if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) { | ||
| // The only subdirectory was missing, so there should be no leaf files: | ||
| assert(leafFiles.isEmpty) | ||
| } else { | ||
| assert(raceCondition == classOf[FileDeletionRaceFileSystem]) | ||
| // One of the two leaf files was missing, but we should still list the other: | ||
| assert(leafFiles.size == 1) | ||
| assert(leafFiles.head.getPath == nonDeletedLeafFilePath) | ||
| } | ||
| } else { | ||
| // We're NOT ignoring missing files, so catalog construction should fail | ||
| val e = intercept[Exception] { | ||
| makeCatalog() | ||
| } | ||
| // The exact exception depends on whether we're using parallel listing | ||
| if (parDiscoveryThreshold == 0) { | ||
| // The FileNotFoundException occurs in a Spark executor (as part of a job) | ||
| assert(e.isInstanceOf[SparkException]) | ||
| assert(e.getMessage.contains("FileNotFoundException")) | ||
| } else { | ||
| // The FileNotFoundException occurs directly on the driver | ||
| assert(e.isInstanceOf[FileNotFoundException]) | ||
| // Test that the FileNotFoundException is triggered for the expected reason: | ||
| if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) { | ||
| assert(e.getMessage.contains(subDirPath.toString)) | ||
| } else { | ||
| assert(raceCondition == classOf[FileDeletionRaceFileSystem]) | ||
| assert(e.getMessage.contains(leafFilePath.toString)) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("PartitioningAwareFileIndex listing parallelized with many top level dirs") { | ||
| for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) { | ||
| withTempDir { dir => | ||
|
|
@@ -356,6 +418,66 @@ class FileIndexSuite extends SharedSQLContext { | |
|
|
||
| } | ||
|
|
||
| object DeletionRaceFileSystem { | ||
| val rootDirPath: Path = new Path("mockFs:///rootDir/") | ||
| val subDirPath: Path = new Path(rootDirPath, "subDir") | ||
| val leafFilePath: Path = new Path(subDirPath, "leafFile") | ||
| val nonDeletedLeafFilePath: Path = new Path(subDirPath, "nonDeletedLeafFile") | ||
| val rootListing: Array[FileStatus] = | ||
| Array(new FileStatus(0, true, 0, 0, 0, subDirPath)) | ||
| val subFolderListing: Array[FileStatus] = | ||
| Array( | ||
| new FileStatus(0, false, 0, 100, 0, leafFilePath), | ||
| new FileStatus(0, false, 0, 100, 0, nonDeletedLeafFilePath)) | ||
| } | ||
|
|
||
| // Used in SPARK-27676 test to simulate a race where a subdirectory is deleted | ||
| // between back-to-back listing calls. | ||
| class SubdirectoryDeletionRaceFileSystem extends RawLocalFileSystem { | ||
| import DeletionRaceFileSystem._ | ||
|
|
||
| override def getScheme: String = "mockFs" | ||
|
|
||
| override def listStatus(path: Path): Array[FileStatus] = { | ||
| if (path == rootDirPath) { | ||
| rootListing | ||
| } else if (path == subDirPath) { | ||
| throw new FileNotFoundException(subDirPath.toString) | ||
| } else { | ||
| throw new IllegalArgumentException() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Used in SPARK-27676 test to simulate a race where a file is deleted between | ||
| // being listed and having its size / file status checked. | ||
| class FileDeletionRaceFileSystem extends RawLocalFileSystem { | ||
| import DeletionRaceFileSystem._ | ||
|
|
||
| override def getScheme: String = "mockFs" | ||
|
|
||
| override def listStatus(path: Path): Array[FileStatus] = { | ||
| if (path == rootDirPath) { | ||
| rootListing | ||
| } else if (path == subDirPath) { | ||
| subFolderListing | ||
| } else { | ||
| throw new IllegalArgumentException() | ||
| } | ||
| } | ||
|
|
||
| override def getFileBlockLocations( | ||
| file: FileStatus, | ||
| start: Long, | ||
| len: Long): Array[BlockLocation] = { | ||
| if (file.getPath == leafFilePath) { | ||
| throw new FileNotFoundException(leafFilePath.toString) | ||
| } else { | ||
| Array.empty | ||
| } | ||
| } | ||
| } | ||
|
|
||
| class FakeParentPathFileSystem extends RawLocalFileSystem { | ||
| override def getScheme: String = "mockFs" | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.