diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 0f5f1591623af..6ea0c07b034b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -734,30 +734,45 @@ object DataSource extends Logging { * Checks and returns files in all the paths. */ private[sql] def checkAndGlobPathIfNecessary( - paths: Seq[String], + pathStrings: Seq[String], hadoopConf: Configuration, checkEmptyGlobPath: Boolean, checkFilesExist: Boolean): Seq[Path] = { - val allGlobPath = paths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - - if (checkEmptyGlobPath && globPath.isEmpty) { - throw new AnalysisException(s"Path does not exist: $qualified") + val qualifiedPaths = pathStrings + .map{pathString => + val path = new Path(pathString) + val fs = path.getFileSystem(hadoopConf) + path.makeQualified(fs.getUri, fs.getWorkingDirectory) } - // Sufficient to check head of the globPath seq for non-glob scenario - // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { - throw new AnalysisException(s"Path does not exist: ${globPath.head}") + // Split the paths into glob and non glob paths, because we don't need to do an existence check + // for globbed paths. + val globPaths = qualifiedPaths + .filter(path => SparkHadoopUtil.get.isGlobPath(path)) + val nonGlobPaths = qualifiedPaths + .filter(path => !SparkHadoopUtil.get.isGlobPath(path)) + + val globbedPaths = globPaths.par.flatMap { globPath => + val fs = globPath.getFileSystem(hadoopConf) + val globResult = SparkHadoopUtil.get.globPath(fs, globPath) + + if (checkEmptyGlobPath && globResult.isEmpty) { + throw new AnalysisException(s"Path does not exist: $globPath") + } + + globResult + } + + nonGlobPaths.par.foreach { path => + val fs = path.getFileSystem(hadoopConf) + if (checkFilesExist && !fs.exists(path)) { + throw new AnalysisException(s"Path does not exist: $path") } - globPath } + val allPaths = globbedPaths ++ nonGlobPaths if (checkFilesExist) { - val (filteredOut, filteredIn) = allGlobPath.partition { path => + val (filteredOut, filteredIn) = allPaths.partition { path => InMemoryFileIndex.shouldFilterOut(path.getName) } if (filteredIn.isEmpty) { @@ -769,7 +784,7 @@ object DataSource extends Logging { } } - allGlobPath + allPaths.seq } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala new file mode 100644 index 0000000000000..ea3deb1a213bf --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.test.SharedSparkSession + +class DataSourceSuite extends SharedSparkSession { + import TestPaths._ + + test("test glob and non glob paths") { + val resultPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString, + globPath1.toString, + globPath2.toString, + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + ) + + assert(resultPaths.toSet == allPathsInFs.toSet) + } + + test("test glob paths") { + val resultPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + globPath1.toString, + globPath2.toString, + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + ) + + assert( + resultPaths.equals( + Seq( + globPath1Result1, + globPath1Result2, + globPath2Result1, + globPath2Result2, + ) + ) + ) + } + + test("test non glob paths") { + val resultPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString, + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + ) + + assert( + resultPaths.equals( + Seq( + path1, + path2, + ) + ) + ) + } + + test("test non existent paths") { + assertThrows[AnalysisException]( + DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString, + nonExistentPath.toString, + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + ) + ) + } + + test("test non existent glob paths") { + assertThrows[AnalysisException]( + DataSource.checkAndGlobPathIfNecessary( + Seq( + globPath1.toString, + globPath2.toString, + nonExistentGlobPath.toString, + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + ) + ) + } +} + +object TestPaths { + val hadoopConf = new Configuration() + hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName) + + val path1: Path = new Path("mockFs:///somepath1") + val path2: Path = new Path("mockFs:///somepath2") + val globPath1: Path = new Path("mockFs:///globpath1*") + val globPath2: Path = new Path("mockFs:///globpath2*") + + val nonExistentPath: Path = new Path("mockFs:///nonexistentpath") + val nonExistentGlobPath: Path = new Path("mockFs:///nonexistentpath*") + + val globPath1Result1: Path = new Path("mockFs:///globpath1/path1") + val globPath1Result2: Path = new Path("mockFs:///globpath1/path2") + val globPath2Result1: Path = new Path("mockFs:///globpath2/path1") + val globPath2Result2: Path = new Path("mockFs:///globpath2/path2") + + val allPathsInFs = Seq( + path1, + path2, + globPath1Result1, + globPath1Result2, + globPath2Result1, + globPath2Result2, + ) + + val mockGlobResults: Map[Path, Array[FileStatus]] = Map( + globPath1 -> + Array( + createMockFileStatus(globPath1Result1.toString), + createMockFileStatus(globPath1Result2.toString), + ), + globPath2 -> + Array( + createMockFileStatus(globPath2Result1.toString), + createMockFileStatus(globPath2Result2.toString), + ), + ) + + def createMockFileStatus(path: String): FileStatus = { + val fileStatus = new FileStatus() + fileStatus.setPath(new Path(path)) + fileStatus + } +} + +class MockFileSystem extends RawLocalFileSystem { + import TestPaths._ + + override def exists(f: Path): Boolean = { + allPathsInFs.contains(f) + } + + override def globStatus(pathPattern: Path): Array[FileStatus] = { + mockGlobResults.getOrElse(pathPattern, Array()) + } +}