From be97aa5d9dde13823f1cfc35d835fac2ca88e7ef Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 16 Sep 2019 00:07:50 -0700 Subject: [PATCH 01/23] SPARK-29089 First pass --- .../execution/datasources/DataSource.scala | 47 ++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 0f5f1591623af..c5518c1f74335 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -734,30 +734,45 @@ object DataSource extends Logging { * Checks and returns files in all the paths. */ private[sql] def checkAndGlobPathIfNecessary( - paths: Seq[String], + pathStrings: Seq[String], hadoopConf: Configuration, checkEmptyGlobPath: Boolean, checkFilesExist: Boolean): Seq[Path] = { - val allGlobPath = paths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - - if (checkEmptyGlobPath && globPath.isEmpty) { - throw new AnalysisException(s"Path does not exist: $qualified") + val qualifiedPaths = pathStrings + .map{pathString => + val path = new Path(pathString) + val fs = path.getFileSystem(hadoopConf) + path.makeQualified(fs.getUri, fs.getWorkingDirectory) } - // Sufficient to check head of the globPath seq for non-glob scenario - // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { - throw new AnalysisException(s"Path does not exist: ${globPath.head}") + // Split the paths into glob and non glob paths, because we don't need to do an existence check + // for non-glob paths. + val globPaths = qualifiedPaths + .filter(path => SparkHadoopUtil.get.isGlobPath(path)) + val nonGlobPaths = qualifiedPaths + .filter(path => !SparkHadoopUtil.get.isGlobPath(path)) + + val globbedPaths = globPaths.flatMap { globPath => + val fs = globPath.getFileSystem(hadoopConf) + val globResult = SparkHadoopUtil.get.globPath(fs, globPath) + + if (checkEmptyGlobPath && globResult.isEmpty) { + throw new AnalysisException(s"Path does not exist: $globPath") + } + + globResult + } + + nonGlobPaths.foreach { path => + val fs = path.getFileSystem(hadoopConf) + if (checkFilesExist && !fs.exists(path)) { + throw new AnalysisException(s"Path does not exist: $path") } - globPath } + val allPaths = globbedPaths ++ nonGlobPaths if (checkFilesExist) { - val (filteredOut, filteredIn) = allGlobPath.partition { path => + val (filteredOut, filteredIn) = allPaths.partition { path => InMemoryFileIndex.shouldFilterOut(path.getName) } if (filteredIn.isEmpty) { @@ -769,7 +784,7 @@ object DataSource extends Logging { } } - allGlobPath + allPaths } /** From b0597f69d0b03cb219895e22f17e417f61773d56 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 16 Sep 2019 00:53:14 -0700 Subject: [PATCH 02/23] Use parallel collections --- .../apache/spark/sql/execution/datasources/DataSource.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index c5518c1f74335..f4e1b5c59396e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -752,7 +752,7 @@ object DataSource extends Logging { val nonGlobPaths = qualifiedPaths .filter(path => !SparkHadoopUtil.get.isGlobPath(path)) - val globbedPaths = globPaths.flatMap { globPath => + val globbedPaths = globPaths.par.flatMap { globPath => val fs = globPath.getFileSystem(hadoopConf) val globResult = SparkHadoopUtil.get.globPath(fs, globPath) @@ -763,7 +763,7 @@ object DataSource extends Logging { globResult } - nonGlobPaths.foreach { path => + nonGlobPaths.par.foreach { path => val fs = path.getFileSystem(hadoopConf) if (checkFilesExist && !fs.exists(path)) { throw new AnalysisException(s"Path does not exist: $path") From 3b2d42d88194df63549327420fa5425a60c7f9e2 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 16 Sep 2019 06:03:28 -0700 Subject: [PATCH 03/23] First pass test --- .../datasources/DataSourceSuite.scala | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala new file mode 100644 index 0000000000000..015e05f33d172 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} +import org.apache.spark.sql.test.SharedSparkSession + +class DataSourceSuite extends SharedSparkSession { + import TestPaths._ + + test("test everything works") { + val hadoopConf = new Configuration() + hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName) + + val allPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + nonGlobPath1.toString, + nonGlobPath2.toString, + globPath1.toString, + globPath2.toString, + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true + ) + + assertThrows(allPaths.equals(expectedResultPaths)) + } +} + +object TestPaths { + val nonGlobPath1: Path = new Path("mockFs:///somepath1") + val nonGlobPath2: Path = new Path("mockFs:///somepath2") + val globPath1: Path = new Path("mockFs:///globpath1*") + val globPath2: Path = new Path("mockFs:///globpath2*") + + val globPath1Result1: Path = new Path("mockFs:///globpath1/path1") + val globPath1Result2: Path = new Path("mockFs:///globpath1/path2") + val globPath2Result1: Path = new Path("mockFs:///globpath2/path1") + val globPath2Result2: Path = new Path("mockFs:///globpath2/path2") + + val expectedResultPaths = Seq( + globPath1, + globPath2, + globPath1Result1, + globPath1Result2, + globPath2Result1, + globPath2Result2, + ) + + val mockNonGlobPaths = Set(nonGlobPath1, nonGlobPath2) + val mockGlobResults: Map[Path, Array[FileStatus]] = Map( + globPath1 -> + Array( + createMockFileStatus(globPath1Result1.toString), + createMockFileStatus(globPath1Result2.toString), + ), + globPath2 -> + Array( + createMockFileStatus(globPath2Result1.toString), + createMockFileStatus(globPath2Result2.toString), + ), + ) + + def createMockFileStatus(path: String): FileStatus = { + val fileStatus = new FileStatus() + fileStatus.setPath(new Path(path)) + fileStatus + } +} + +class MockFileSystem extends RawLocalFileSystem { + import TestPaths._ + + override def exists(f: Path): Boolean = { + mockNonGlobPaths.contains(f) + } + + override def globStatus(pathPattern: Path): Array[FileStatus] = { + mockGlobResults.getOrElse(pathPattern, Array()) + } +} From 6887d4f669af9ebd378cf3f3ec679a6dfeebb772 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 00:11:49 -0700 Subject: [PATCH 04/23] Add tests --- .../datasources/DataSourceSuite.scala | 105 +++++++++++++++--- 1 file changed, 91 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index 015e05f33d172..eb7fd84084948 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -19,52 +19,129 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.test.SharedSparkSession class DataSourceSuite extends SharedSparkSession { import TestPaths._ - test("test everything works") { - val hadoopConf = new Configuration() - hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName) + test("test glob and non glob paths") { + val allPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString, + globPath1.toString, + globPath2.toString, + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + ) + assert(allPaths.equals(allPathsInFs)) + } + + test("test glob paths") { val allPaths = DataSource.checkAndGlobPathIfNecessary( Seq( - nonGlobPath1.toString, - nonGlobPath2.toString, globPath1.toString, globPath2.toString, ), hadoopConf, checkEmptyGlobPath = true, - checkFilesExist = true + checkFilesExist = true, ) - assertThrows(allPaths.equals(expectedResultPaths)) + assert( + allPaths.equals( + Seq( + globPath1Result1, + globPath1Result2, + globPath2Result1, + globPath2Result2, + ) + ) + ) + } + + test("test non glob paths") { + val allPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString, + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + ) + + assert( + allPaths.equals( + Seq( + path1, + path2, + ) + ) + ) + } + + test("test non existent paths") { + assertThrows[AnalysisException]( + DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString, + nonExistentPath.toString, + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + ) + ) + } + + test("test non existent glob paths") { + assertThrows[AnalysisException]( + DataSource.checkAndGlobPathIfNecessary( + Seq( + globPath1.toString, + globPath2.toString, + nonExistentGlobPath.toString, + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + ) + ) } } object TestPaths { - val nonGlobPath1: Path = new Path("mockFs:///somepath1") - val nonGlobPath2: Path = new Path("mockFs:///somepath2") + val hadoopConf = new Configuration() + hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName) + + val path1: Path = new Path("mockFs:///somepath1") + val path2: Path = new Path("mockFs:///somepath2") val globPath1: Path = new Path("mockFs:///globpath1*") val globPath2: Path = new Path("mockFs:///globpath2*") + val nonExistentPath: Path = new Path("mockFs:///nonexistentpath") + val nonExistentGlobPath: Path = new Path("mockFs:///nonexistentpath*") + val globPath1Result1: Path = new Path("mockFs:///globpath1/path1") val globPath1Result2: Path = new Path("mockFs:///globpath1/path2") val globPath2Result1: Path = new Path("mockFs:///globpath2/path1") val globPath2Result2: Path = new Path("mockFs:///globpath2/path2") - val expectedResultPaths = Seq( - globPath1, - globPath2, + val allPathsInFs = Seq( + path1, + path2, globPath1Result1, globPath1Result2, globPath2Result1, globPath2Result2, ) - val mockNonGlobPaths = Set(nonGlobPath1, nonGlobPath2) val mockGlobResults: Map[Path, Array[FileStatus]] = Map( globPath1 -> Array( @@ -89,7 +166,7 @@ class MockFileSystem extends RawLocalFileSystem { import TestPaths._ override def exists(f: Path): Boolean = { - mockNonGlobPaths.contains(f) + allPathsInFs.contains(f) } override def globStatus(pathPattern: Path): Array[FileStatus] = { From 1df3316ab7230eba2629285a328aa1ed4b2f525d Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 00:27:52 -0700 Subject: [PATCH 05/23] Update comments --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f4e1b5c59396e..7ee10c9759cf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -746,7 +746,7 @@ object DataSource extends Logging { } // Split the paths into glob and non glob paths, because we don't need to do an existence check - // for non-glob paths. + // for globbed paths. val globPaths = qualifiedPaths .filter(path => SparkHadoopUtil.get.isGlobPath(path)) val nonGlobPaths = qualifiedPaths From 65239f99b9443dd802d755056a374988429d697a Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 00:31:15 -0700 Subject: [PATCH 06/23] Convert parallel collection --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 7ee10c9759cf7..6ea0c07b034b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -784,7 +784,7 @@ object DataSource extends Logging { } } - allPaths + allPaths.seq } /** From 1d7975c77f204c7dc3e72088186c61c1cf688b44 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 00:39:46 -0700 Subject: [PATCH 07/23] Fix test --- .../sql/execution/datasources/DataSourceSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index eb7fd84084948..ea3deb1a213bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -26,7 +26,7 @@ class DataSourceSuite extends SharedSparkSession { import TestPaths._ test("test glob and non glob paths") { - val allPaths = DataSource.checkAndGlobPathIfNecessary( + val resultPaths = DataSource.checkAndGlobPathIfNecessary( Seq( path1.toString, path2.toString, @@ -38,11 +38,11 @@ class DataSourceSuite extends SharedSparkSession { checkFilesExist = true, ) - assert(allPaths.equals(allPathsInFs)) + assert(resultPaths.toSet == allPathsInFs.toSet) } test("test glob paths") { - val allPaths = DataSource.checkAndGlobPathIfNecessary( + val resultPaths = DataSource.checkAndGlobPathIfNecessary( Seq( globPath1.toString, globPath2.toString, @@ -53,7 +53,7 @@ class DataSourceSuite extends SharedSparkSession { ) assert( - allPaths.equals( + resultPaths.equals( Seq( globPath1Result1, globPath1Result2, @@ -65,7 +65,7 @@ class DataSourceSuite extends SharedSparkSession { } test("test non glob paths") { - val allPaths = DataSource.checkAndGlobPathIfNecessary( + val resultPaths = DataSource.checkAndGlobPathIfNecessary( Seq( path1.toString, path2.toString, @@ -76,7 +76,7 @@ class DataSourceSuite extends SharedSparkSession { ) assert( - allPaths.equals( + resultPaths.equals( Seq( path1, path2, From 2a24b5fc8c214d7a29f07ac953834b10d941492b Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 01:32:52 -0700 Subject: [PATCH 08/23] Remove trailing commas for Scalastyle --- .../datasources/DataSourceSuite.scala | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index ea3deb1a213bf..4b5f4d4193dcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.test.SharedSparkSession @@ -31,11 +32,11 @@ class DataSourceSuite extends SharedSparkSession { path1.toString, path2.toString, globPath1.toString, - globPath2.toString, + globPath2.toString ), hadoopConf, checkEmptyGlobPath = true, - checkFilesExist = true, + checkFilesExist = true ) assert(resultPaths.toSet == allPathsInFs.toSet) @@ -45,11 +46,11 @@ class DataSourceSuite extends SharedSparkSession { val resultPaths = DataSource.checkAndGlobPathIfNecessary( Seq( globPath1.toString, - globPath2.toString, + globPath2.toString ), hadoopConf, checkEmptyGlobPath = true, - checkFilesExist = true, + checkFilesExist = true ) assert( @@ -58,7 +59,7 @@ class DataSourceSuite extends SharedSparkSession { globPath1Result1, globPath1Result2, globPath2Result1, - globPath2Result2, + globPath2Result2 ) ) ) @@ -68,18 +69,18 @@ class DataSourceSuite extends SharedSparkSession { val resultPaths = DataSource.checkAndGlobPathIfNecessary( Seq( path1.toString, - path2.toString, + path2.toString ), hadoopConf, checkEmptyGlobPath = true, - checkFilesExist = true, + checkFilesExist = true ) assert( resultPaths.equals( Seq( path1, - path2, + path2 ) ) ) @@ -91,11 +92,11 @@ class DataSourceSuite extends SharedSparkSession { Seq( path1.toString, path2.toString, - nonExistentPath.toString, + nonExistentPath.toString ), hadoopConf, checkEmptyGlobPath = true, - checkFilesExist = true, + checkFilesExist = true ) ) } @@ -106,11 +107,11 @@ class DataSourceSuite extends SharedSparkSession { Seq( globPath1.toString, globPath2.toString, - nonExistentGlobPath.toString, + nonExistentGlobPath.toString ), hadoopConf, checkEmptyGlobPath = true, - checkFilesExist = true, + checkFilesExist = true ) ) } @@ -139,20 +140,20 @@ object TestPaths { globPath1Result1, globPath1Result2, globPath2Result1, - globPath2Result2, + globPath2Result2 ) val mockGlobResults: Map[Path, Array[FileStatus]] = Map( globPath1 -> Array( createMockFileStatus(globPath1Result1.toString), - createMockFileStatus(globPath1Result2.toString), + createMockFileStatus(globPath1Result2.toString) ), globPath2 -> Array( createMockFileStatus(globPath2Result1.toString), - createMockFileStatus(globPath2Result2.toString), - ), + createMockFileStatus(globPath2Result2.toString) + ) ) def createMockFileStatus(path: String): FileStatus = { From 0cd31adc4eb18c8d9c7acc95da5ccce55566190a Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 02:38:55 -0700 Subject: [PATCH 09/23] Use set comparisons in test --- .../datasources/DataSourceSuite.scala | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index 4b5f4d4193dcd..e69843f003958 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -54,13 +54,11 @@ class DataSourceSuite extends SharedSparkSession { ) assert( - resultPaths.equals( - Seq( - globPath1Result1, - globPath1Result2, - globPath2Result1, - globPath2Result2 - ) + resultPaths.toSet == Set( + globPath1Result1, + globPath1Result2, + globPath2Result1, + globPath2Result2 ) ) } @@ -77,11 +75,9 @@ class DataSourceSuite extends SharedSparkSession { ) assert( - resultPaths.equals( - Seq( - path1, - path2 - ) + resultPaths.toSet == Set( + path1, + path2 ) ) } From e8a4d6db5e865e599bff6945f7c69d26225f28ef Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 19:17:55 -0700 Subject: [PATCH 10/23] Map formatting --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 6ea0c07b034b4..93d445f0eeea9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -739,7 +739,7 @@ object DataSource extends Logging { checkEmptyGlobPath: Boolean, checkFilesExist: Boolean): Seq[Path] = { val qualifiedPaths = pathStrings - .map{pathString => + .map { pathString => val path = new Path(pathString) val fs = path.getFileSystem(hadoopConf) path.makeQualified(fs.getUri, fs.getWorkingDirectory) From 99d6590edb4bfa4c9c6cfa0ded9895687b5437fc Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 19:18:15 -0700 Subject: [PATCH 11/23] Use partition function --- .../apache/spark/sql/execution/datasources/DataSource.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 93d445f0eeea9..e9b9887103198 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -747,10 +747,7 @@ object DataSource extends Logging { // Split the paths into glob and non glob paths, because we don't need to do an existence check // for globbed paths. - val globPaths = qualifiedPaths - .filter(path => SparkHadoopUtil.get.isGlobPath(path)) - val nonGlobPaths = qualifiedPaths - .filter(path => !SparkHadoopUtil.get.isGlobPath(path)) + val (globPaths, nonGlobPaths) = qualifiedPaths.partition(SparkHadoopUtil.get.isGlobPath) val globbedPaths = globPaths.par.flatMap { globPath => val fs = globPath.getFileSystem(hadoopConf) From 41f6cfa07381244e0f1c5bacddeab670ea7050b5 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 19:18:49 -0700 Subject: [PATCH 12/23] Use === in asserts --- .../spark/sql/execution/datasources/DataSourceSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index e69843f003958..c2ff3a564b530 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -39,7 +39,7 @@ class DataSourceSuite extends SharedSparkSession { checkFilesExist = true ) - assert(resultPaths.toSet == allPathsInFs.toSet) + assert(resultPaths.toSet === allPathsInFs.toSet) } test("test glob paths") { @@ -54,7 +54,7 @@ class DataSourceSuite extends SharedSparkSession { ) assert( - resultPaths.toSet == Set( + resultPaths.toSet === Set( globPath1Result1, globPath1Result2, globPath2Result1, @@ -75,7 +75,7 @@ class DataSourceSuite extends SharedSparkSession { ) assert( - resultPaths.toSet == Set( + resultPaths.toSet === Set( path1, path2 ) From ba36de4280babd2c564fddeba1c166186747eded Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 19:19:06 -0700 Subject: [PATCH 13/23] Add authority to test paths --- .../datasources/DataSourceSuite.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index c2ff3a564b530..442f7d73921e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -117,18 +117,18 @@ object TestPaths { val hadoopConf = new Configuration() hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName) - val path1: Path = new Path("mockFs:///somepath1") - val path2: Path = new Path("mockFs:///somepath2") - val globPath1: Path = new Path("mockFs:///globpath1*") - val globPath2: Path = new Path("mockFs:///globpath2*") - - val nonExistentPath: Path = new Path("mockFs:///nonexistentpath") - val nonExistentGlobPath: Path = new Path("mockFs:///nonexistentpath*") - - val globPath1Result1: Path = new Path("mockFs:///globpath1/path1") - val globPath1Result2: Path = new Path("mockFs:///globpath1/path2") - val globPath2Result1: Path = new Path("mockFs:///globpath2/path1") - val globPath2Result2: Path = new Path("mockFs:///globpath2/path2") + val path1: Path = new Path("mockFs://mockFs/somepath1") + val path2: Path = new Path("mockFs://mockFs/somepath2") + val globPath1: Path = new Path("mockFs://mockFs/globpath1*") + val globPath2: Path = new Path("mockFs://mockFs/globpath2*") + + val nonExistentPath: Path = new Path("mockFs://mockFs/nonexistentpath") + val nonExistentGlobPath: Path = new Path("mockFs://mockFs/nonexistentpath*") + + val globPath1Result1: Path = new Path("mockFs://mockFs/globpath1/path1") + val globPath1Result2: Path = new Path("mockFs://mockFs/globpath1/path2") + val globPath2Result1: Path = new Path("mockFs://mockFs/globpath2/path1") + val globPath2Result2: Path = new Path("mockFs://mockFs/globpath2/path2") val allPathsInFs = Seq( path1, From 06b2b453951debb2eb575b9007d26be6461a19e8 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 23 Sep 2019 19:54:45 -0700 Subject: [PATCH 14/23] Use ThreadUtils instead of parallel collections --- .../execution/datasources/DataSource.scala | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e9b9887103198..86b6c950b466b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -26,6 +26,7 @@ import scala.util.{Failure, Success, Try} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -50,7 +51,7 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType} import org.apache.spark.sql.util.SchemaUtils -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to @@ -749,22 +750,31 @@ object DataSource extends Logging { // for globbed paths. val (globPaths, nonGlobPaths) = qualifiedPaths.partition(SparkHadoopUtil.get.isGlobPath) - val globbedPaths = globPaths.par.flatMap { globPath => - val fs = globPath.getFileSystem(hadoopConf) - val globResult = SparkHadoopUtil.get.globPath(fs, globPath) + val globbedPaths = Try({ + ThreadUtils.parmap(globPaths, "globPath", 8) { globPath => + val fs = globPath.getFileSystem(hadoopConf) + val globResult = SparkHadoopUtil.get.globPath(fs, globPath) - if (checkEmptyGlobPath && globResult.isEmpty) { - throw new AnalysisException(s"Path does not exist: $globPath") - } + if (checkEmptyGlobPath && globResult.isEmpty) { + throw new AnalysisException(s"Path does not exist: $globPath") + } + + globResult + }.flatten + }).recoverWith({ + case e: SparkException => throw e.getCause + }).get - globResult - } - nonGlobPaths.par.foreach { path => - val fs = path.getFileSystem(hadoopConf) - if (checkFilesExist && !fs.exists(path)) { - throw new AnalysisException(s"Path does not exist: $path") + try { + ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", 8) { path => + val fs = path.getFileSystem(hadoopConf) + if (checkFilesExist && !fs.exists(path)) { + throw new AnalysisException(s"Path does not exist: $path") + } } + } catch { + case e: SparkException => throw e.getCause } val allPaths = globbedPaths ++ nonGlobPaths From bcd97af9ccaef91d3f945c72d9c5868b2e2104d1 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Tue, 24 Sep 2019 02:29:59 -0700 Subject: [PATCH 15/23] Increase num threads to 20 --- .../apache/spark/sql/execution/datasources/DataSource.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 86b6c950b466b..308c960917c43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -751,7 +751,7 @@ object DataSource extends Logging { val (globPaths, nonGlobPaths) = qualifiedPaths.partition(SparkHadoopUtil.get.isGlobPath) val globbedPaths = Try({ - ThreadUtils.parmap(globPaths, "globPath", 8) { globPath => + ThreadUtils.parmap(globPaths, "globPath", 20) { globPath => val fs = globPath.getFileSystem(hadoopConf) val globResult = SparkHadoopUtil.get.globPath(fs, globPath) @@ -767,7 +767,7 @@ object DataSource extends Logging { try { - ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", 8) { path => + ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", 20) { path => val fs = path.getFileSystem(hadoopConf) if (checkFilesExist && !fs.exists(path)) { throw new AnalysisException(s"Path does not exist: $path") From 3a7d23a839f48ed8bc4db2408f910659890a2d96 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Wed, 25 Sep 2019 14:44:17 -0700 Subject: [PATCH 16/23] Change map style --- .../spark/sql/execution/datasources/DataSource.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 308c960917c43..c34e32f9cf8ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -739,12 +739,11 @@ object DataSource extends Logging { hadoopConf: Configuration, checkEmptyGlobPath: Boolean, checkFilesExist: Boolean): Seq[Path] = { - val qualifiedPaths = pathStrings - .map { pathString => - val path = new Path(pathString) - val fs = path.getFileSystem(hadoopConf) - path.makeQualified(fs.getUri, fs.getWorkingDirectory) - } + val qualifiedPaths = pathStrings.map { pathString => + val path = new Path(pathString) + val fs = path.getFileSystem(hadoopConf) + path.makeQualified(fs.getUri, fs.getWorkingDirectory) + } // Split the paths into glob and non glob paths, because we don't need to do an existence check // for globbed paths. From 7c61252fec9b23bcec173afe37a8d6db533a98d5 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Wed, 25 Sep 2019 15:57:49 -0700 Subject: [PATCH 17/23] Don't launch unnecessary parallel exists task --- .../execution/datasources/DataSource.scala | 16 +++++++------- .../datasources/DataSourceSuite.scala | 21 +++++++++++++++++++ 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index c34e32f9cf8ea..8dfe0b7c83d9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -765,15 +765,17 @@ object DataSource extends Logging { }).get - try { - ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", 20) { path => - val fs = path.getFileSystem(hadoopConf) - if (checkFilesExist && !fs.exists(path)) { - throw new AnalysisException(s"Path does not exist: $path") + if (checkFilesExist) { + try { + ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", 20) { path => + val fs = path.getFileSystem(hadoopConf) + if (!fs.exists(path)) { + throw new AnalysisException(s"Path does not exist: $path") + } } + } catch { + case e: SparkException => throw e.getCause } - } catch { - case e: SparkException => throw e.getCause } val allPaths = globbedPaths ++ nonGlobPaths diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index 442f7d73921e0..9fd5b4259279b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -82,6 +82,27 @@ class DataSourceSuite extends SharedSparkSession { ) } + test("test non glob paths checkFilesExist=false") { + val resultPaths = DataSource.checkAndGlobPathIfNecessary( + Seq( + path1.toString, + path2.toString, + nonExistentPath.toString + ), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = false + ) + + assert( + resultPaths.toSet === Set( + path1, + path2, + nonExistentPath + ) + ) + } + test("test non existent paths") { assertThrows[AnalysisException]( DataSource.checkAndGlobPathIfNecessary( From 2bed5b993de70fec7dcda74cc368a43aa731032f Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Fri, 25 Oct 2019 18:17:04 -0700 Subject: [PATCH 18/23] Use normal try catch --- .../execution/datasources/DataSource.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 29115e9d85e65..0b48b6f5cf5de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -749,21 +749,21 @@ object DataSource extends Logging { // for globbed paths. val (globPaths, nonGlobPaths) = qualifiedPaths.partition(SparkHadoopUtil.get.isGlobPath) - val globbedPaths = Try({ - ThreadUtils.parmap(globPaths, "globPath", 20) { globPath => - val fs = globPath.getFileSystem(hadoopConf) - val globResult = SparkHadoopUtil.get.globPath(fs, globPath) - - if (checkEmptyGlobPath && globResult.isEmpty) { - throw new AnalysisException(s"Path does not exist: $globPath") - } + val globbedPaths = + try { + ThreadUtils.parmap(globPaths, "globPath", 20) { globPath => + val fs = globPath.getFileSystem(hadoopConf) + val globResult = SparkHadoopUtil.get.globPath(fs, globPath) - globResult - }.flatten - }).recoverWith({ - case e: SparkException => throw e.getCause - }).get + if (checkEmptyGlobPath && globResult.isEmpty) { + throw new AnalysisException(s"Path does not exist: $globPath") + } + globResult + }.flatten + } catch { + case e: SparkException => throw e.getCause + } if (checkFilesExist) { try { From dc48da8ca62ae0f84bb3f73aa4476066d3f963db Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Mon, 11 Nov 2019 13:17:34 -0800 Subject: [PATCH 19/23] Set numThreads to 40 --- .../spark/sql/execution/datasources/DataSource.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 0b48b6f5cf5de..e3429e3903175 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -738,7 +738,8 @@ object DataSource extends Logging { pathStrings: Seq[String], hadoopConf: Configuration, checkEmptyGlobPath: Boolean, - checkFilesExist: Boolean): Seq[Path] = { + checkFilesExist: Boolean, + numThreads: Integer = 40): Seq[Path] = { val qualifiedPaths = pathStrings.map { pathString => val path = new Path(pathString) val fs = path.getFileSystem(hadoopConf) @@ -751,7 +752,7 @@ object DataSource extends Logging { val globbedPaths = try { - ThreadUtils.parmap(globPaths, "globPath", 20) { globPath => + ThreadUtils.parmap(globPaths, "globPath", numThreads) { globPath => val fs = globPath.getFileSystem(hadoopConf) val globResult = SparkHadoopUtil.get.globPath(fs, globPath) @@ -767,7 +768,7 @@ object DataSource extends Logging { if (checkFilesExist) { try { - ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", 20) { path => + ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", numThreads) { path => val fs = path.getFileSystem(hadoopConf) if (!fs.exists(path)) { throw new AnalysisException(s"Path does not exist: $path") From 8cbc28aad9aa34ad0ab0d16609863cbfbea176d7 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Tue, 10 Dec 2019 16:00:28 -0800 Subject: [PATCH 20/23] Pull out getFileSystem --- .../spark/sql/execution/datasources/DataSource.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e3429e3903175..3a77e3419b91a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -740,9 +740,14 @@ object DataSource extends Logging { checkEmptyGlobPath: Boolean, checkFilesExist: Boolean, numThreads: Integer = 40): Seq[Path] = { + if (pathStrings.isEmpty) { + return Seq.empty + } + + val fs = new Path(pathStrings.head).getFileSystem(hadoopConf) + val qualifiedPaths = pathStrings.map { pathString => val path = new Path(pathString) - val fs = path.getFileSystem(hadoopConf) path.makeQualified(fs.getUri, fs.getWorkingDirectory) } @@ -753,7 +758,6 @@ object DataSource extends Logging { val globbedPaths = try { ThreadUtils.parmap(globPaths, "globPath", numThreads) { globPath => - val fs = globPath.getFileSystem(hadoopConf) val globResult = SparkHadoopUtil.get.globPath(fs, globPath) if (checkEmptyGlobPath && globResult.isEmpty) { @@ -769,7 +773,6 @@ object DataSource extends Logging { if (checkFilesExist) { try { ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", numThreads) { path => - val fs = path.getFileSystem(hadoopConf) if (!fs.exists(path)) { throw new AnalysisException(s"Path does not exist: $path") } From c52b3b08d9004c68b703bad73b27b348a63afc23 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Tue, 10 Dec 2019 16:02:08 -0800 Subject: [PATCH 21/23] Use toSeq instead of seq --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 3a77e3419b91a..ac237f9836bc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -796,7 +796,7 @@ object DataSource extends Logging { } } - allPaths.seq + allPaths.toSeq } /** From a3cdc59e4770e9370a30ce8824c3e50e16ecaa61 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Tue, 10 Dec 2019 16:02:35 -0800 Subject: [PATCH 22/23] Remove : Path type --- .../datasources/DataSourceSuite.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index 9fd5b4259279b..1e3c660e09454 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -138,18 +138,18 @@ object TestPaths { val hadoopConf = new Configuration() hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName) - val path1: Path = new Path("mockFs://mockFs/somepath1") - val path2: Path = new Path("mockFs://mockFs/somepath2") - val globPath1: Path = new Path("mockFs://mockFs/globpath1*") - val globPath2: Path = new Path("mockFs://mockFs/globpath2*") - - val nonExistentPath: Path = new Path("mockFs://mockFs/nonexistentpath") - val nonExistentGlobPath: Path = new Path("mockFs://mockFs/nonexistentpath*") - - val globPath1Result1: Path = new Path("mockFs://mockFs/globpath1/path1") - val globPath1Result2: Path = new Path("mockFs://mockFs/globpath1/path2") - val globPath2Result1: Path = new Path("mockFs://mockFs/globpath2/path1") - val globPath2Result2: Path = new Path("mockFs://mockFs/globpath2/path2") + val path1 = new Path("mockFs://mockFs/somepath1") + val path2 = new Path("mockFs://mockFs/somepath2") + val globPath1 = new Path("mockFs://mockFs/globpath1*") + val globPath2 = new Path("mockFs://mockFs/globpath2*") + + val nonExistentPath = new Path("mockFs://mockFs/nonexistentpath") + val nonExistentGlobPath = new Path("mockFs://mockFs/nonexistentpath*") + + val globPath1Result1 = new Path("mockFs://mockFs/globpath1/path1") + val globPath1Result2 = new Path("mockFs://mockFs/globpath1/path2") + val globPath2Result1 = new Path("mockFs://mockFs/globpath2/path1") + val globPath2Result2 = new Path("mockFs://mockFs/globpath2/path2") val allPathsInFs = Seq( path1, From 105235ce1cdbdfffb912baba02d31abd009f44c2 Mon Sep 17 00:00:00 2001 From: Arwin Tio Date: Wed, 11 Dec 2019 00:28:52 -0800 Subject: [PATCH 23/23] Revert "Pull out getFileSystem" This reverts commit 8cbc28aad9aa34ad0ab0d16609863cbfbea176d7. --- .../spark/sql/execution/datasources/DataSource.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ac237f9836bc4..430cdaff8d9ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -740,14 +740,9 @@ object DataSource extends Logging { checkEmptyGlobPath: Boolean, checkFilesExist: Boolean, numThreads: Integer = 40): Seq[Path] = { - if (pathStrings.isEmpty) { - return Seq.empty - } - - val fs = new Path(pathStrings.head).getFileSystem(hadoopConf) - val qualifiedPaths = pathStrings.map { pathString => val path = new Path(pathString) + val fs = path.getFileSystem(hadoopConf) path.makeQualified(fs.getUri, fs.getWorkingDirectory) } @@ -758,6 +753,7 @@ object DataSource extends Logging { val globbedPaths = try { ThreadUtils.parmap(globPaths, "globPath", numThreads) { globPath => + val fs = globPath.getFileSystem(hadoopConf) val globResult = SparkHadoopUtil.get.globPath(fs, globPath) if (checkEmptyGlobPath && globResult.isEmpty) { @@ -773,6 +769,7 @@ object DataSource extends Logging { if (checkFilesExist) { try { ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", numThreads) { path => + val fs = path.getFileSystem(hadoopConf) if (!fs.exists(path)) { throw new AnalysisException(s"Path does not exist: $path") }