diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e9b8fae7cd735..430cdaff8d9ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -26,6 +26,7 @@ import scala.util.{Failure, Success, Try} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -50,7 +51,7 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType} import org.apache.spark.sql.util.SchemaUtils -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to @@ -734,30 +735,53 @@ object DataSource extends Logging { * Checks and returns files in all the paths. */ private[sql] def checkAndGlobPathIfNecessary( - paths: Seq[String], + pathStrings: Seq[String], hadoopConf: Configuration, checkEmptyGlobPath: Boolean, - checkFilesExist: Boolean): Seq[Path] = { - val allGlobPath = paths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - - if (checkEmptyGlobPath && globPath.isEmpty) { - throw new AnalysisException(s"Path does not exist: $qualified") + checkFilesExist: Boolean, + numThreads: Integer = 40): Seq[Path] = { + val qualifiedPaths = pathStrings.map { pathString => + val path = new Path(pathString) + val fs = path.getFileSystem(hadoopConf) + path.makeQualified(fs.getUri, fs.getWorkingDirectory) + } + + // Split the paths into glob and non glob paths, because we don't need to do an existence check + // for globbed paths. + val (globPaths, nonGlobPaths) = qualifiedPaths.partition(SparkHadoopUtil.get.isGlobPath) + + val globbedPaths = + try { + ThreadUtils.parmap(globPaths, "globPath", numThreads) { globPath => + val fs = globPath.getFileSystem(hadoopConf) + val globResult = SparkHadoopUtil.get.globPath(fs, globPath) + + if (checkEmptyGlobPath && globResult.isEmpty) { + throw new AnalysisException(s"Path does not exist: $globPath") + } + + globResult + }.flatten + } catch { + case e: SparkException => throw e.getCause } - // Sufficient to check head of the globPath seq for non-glob scenario - // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { - throw new AnalysisException(s"Path does not exist: ${globPath.head}") + if (checkFilesExist) { + try { + ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", numThreads) { path => + val fs = path.getFileSystem(hadoopConf) + if (!fs.exists(path)) { + throw new AnalysisException(s"Path does not exist: $path") + } + } + } catch { + case e: SparkException => throw e.getCause } - globPath } + val allPaths = globbedPaths ++ nonGlobPaths if (checkFilesExist) { - val (filteredOut, filteredIn) = allGlobPath.partition { path => + val (filteredOut, filteredIn) = allPaths.partition { path => InMemoryFileIndex.shouldFilterOut(path.getName) } if (filteredIn.isEmpty) { @@ -769,7 +793,7 @@ object DataSource extends Logging { } } - allGlobPath + allPaths.toSeq } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala new file mode 100644 index 0000000000000..1e3c660e09454 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.test.SharedSparkSession + +class DataSourceSuite extends SharedSparkSession { + import TestPaths._ + + test("test glob and non glob paths") { + val resultPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString, + globPath1.toString, + globPath2.toString + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true + ) + + assert(resultPaths.toSet === allPathsInFs.toSet) + } + + test("test glob paths") { + val resultPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + globPath1.toString, + globPath2.toString + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true + ) + + assert( + resultPaths.toSet === Set( + globPath1Result1, + globPath1Result2, + globPath2Result1, + globPath2Result2 + ) + ) + } + + test("test non glob paths") { + val resultPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true + ) + + assert( + resultPaths.toSet === Set( + path1, + path2 + ) + ) + } + + test("test non glob paths checkFilesExist=false") { + val resultPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString, + nonExistentPath.toString + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = false + ) + + assert( + resultPaths.toSet === Set( + path1, + path2, + nonExistentPath + ) + ) + } + + test("test non existent paths") { + assertThrows[AnalysisException]( + DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString, + nonExistentPath.toString + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true + ) + ) + } + + test("test non existent glob paths") { + assertThrows[AnalysisException]( + DataSource.checkAndGlobPathIfNecessary( + Seq( + globPath1.toString, + globPath2.toString, + nonExistentGlobPath.toString + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true + ) + ) + } +} + +object TestPaths { + val hadoopConf = new Configuration() + hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName) + + val path1 = new Path("mockFs://mockFs/somepath1") + val path2 = new Path("mockFs://mockFs/somepath2") + val globPath1 = new Path("mockFs://mockFs/globpath1*") + val globPath2 = new Path("mockFs://mockFs/globpath2*") + + val nonExistentPath = new Path("mockFs://mockFs/nonexistentpath") + val nonExistentGlobPath = new Path("mockFs://mockFs/nonexistentpath*") + + val globPath1Result1 = new Path("mockFs://mockFs/globpath1/path1") + val globPath1Result2 = new Path("mockFs://mockFs/globpath1/path2") + val globPath2Result1 = new Path("mockFs://mockFs/globpath2/path1") + val globPath2Result2 = new Path("mockFs://mockFs/globpath2/path2") + + val allPathsInFs = Seq( + path1, + path2, + globPath1Result1, + globPath1Result2, + globPath2Result1, + globPath2Result2 + ) + + val mockGlobResults: Map[Path, Array[FileStatus]] = Map( + globPath1 -> + Array( + createMockFileStatus(globPath1Result1.toString), + createMockFileStatus(globPath1Result2.toString) + ), + globPath2 -> + Array( + createMockFileStatus(globPath2Result1.toString), + createMockFileStatus(globPath2Result2.toString) + ) + ) + + def createMockFileStatus(path: String): FileStatus = { + val fileStatus = new FileStatus() + fileStatus.setPath(new Path(path)) + fileStatus + } +} + +class MockFileSystem extends RawLocalFileSystem { + import TestPaths._ + + override def exists(f: Path): Boolean = { + allPathsInFs.contains(f) + } + + override def globStatus(pathPattern: Path): Array[FileStatus] = { + mockGlobResults.getOrElse(pathPattern, Array()) + } +}