Skip to content

Commit 8b24d85

Browse files
authored
Merge pull request #1 from cozos/SPARK-29089
SPARK-29089 Parallelize DataSource#checkAndGlobPathIfNecessary
2 parents 1e9fac8 + 1d7975c commit 8b24d85

File tree

2 files changed

+206
-16
lines changed

2 files changed

+206
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -734,30 +734,45 @@ object DataSource extends Logging {
734734
* Checks and returns files in all the paths.
735735
*/
736736
private[sql] def checkAndGlobPathIfNecessary(
737-
paths: Seq[String],
737+
pathStrings: Seq[String],
738738
hadoopConf: Configuration,
739739
checkEmptyGlobPath: Boolean,
740740
checkFilesExist: Boolean): Seq[Path] = {
741-
val allGlobPath = paths.flatMap { path =>
742-
val hdfsPath = new Path(path)
743-
val fs = hdfsPath.getFileSystem(hadoopConf)
744-
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
745-
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
746-
747-
if (checkEmptyGlobPath && globPath.isEmpty) {
748-
throw new AnalysisException(s"Path does not exist: $qualified")
741+
val qualifiedPaths = pathStrings
742+
.map{pathString =>
743+
val path = new Path(pathString)
744+
val fs = path.getFileSystem(hadoopConf)
745+
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
749746
}
750747

751-
// Sufficient to check head of the globPath seq for non-glob scenario
752-
// Don't need to check once again if files exist in streaming mode
753-
if (checkFilesExist && !fs.exists(globPath.head)) {
754-
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
748+
// Split the paths into glob and non glob paths, because we don't need to do an existence check
749+
// for globbed paths.
750+
val globPaths = qualifiedPaths
751+
.filter(path => SparkHadoopUtil.get.isGlobPath(path))
752+
val nonGlobPaths = qualifiedPaths
753+
.filter(path => !SparkHadoopUtil.get.isGlobPath(path))
754+
755+
val globbedPaths = globPaths.par.flatMap { globPath =>
756+
val fs = globPath.getFileSystem(hadoopConf)
757+
val globResult = SparkHadoopUtil.get.globPath(fs, globPath)
758+
759+
if (checkEmptyGlobPath && globResult.isEmpty) {
760+
throw new AnalysisException(s"Path does not exist: $globPath")
761+
}
762+
763+
globResult
764+
}
765+
766+
nonGlobPaths.par.foreach { path =>
767+
val fs = path.getFileSystem(hadoopConf)
768+
if (checkFilesExist && !fs.exists(path)) {
769+
throw new AnalysisException(s"Path does not exist: $path")
755770
}
756-
globPath
757771
}
758772

773+
val allPaths = globbedPaths ++ nonGlobPaths
759774
if (checkFilesExist) {
760-
val (filteredOut, filteredIn) = allGlobPath.partition { path =>
775+
val (filteredOut, filteredIn) = allPaths.partition { path =>
761776
InMemoryFileIndex.shouldFilterOut(path.getName)
762777
}
763778
if (filteredIn.isEmpty) {
@@ -769,7 +784,7 @@ object DataSource extends Logging {
769784
}
770785
}
771786

772-
allGlobPath
787+
allPaths.seq
773788
}
774789

775790
/**
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
22+
import org.apache.spark.sql.AnalysisException
23+
import org.apache.spark.sql.test.SharedSparkSession
24+
25+
class DataSourceSuite extends SharedSparkSession {
26+
import TestPaths._
27+
28+
test("test glob and non glob paths") {
29+
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
30+
Seq(
31+
path1.toString,
32+
path2.toString,
33+
globPath1.toString,
34+
globPath2.toString,
35+
),
36+
hadoopConf,
37+
checkEmptyGlobPath = true,
38+
checkFilesExist = true,
39+
)
40+
41+
assert(resultPaths.toSet == allPathsInFs.toSet)
42+
}
43+
44+
test("test glob paths") {
45+
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
46+
Seq(
47+
globPath1.toString,
48+
globPath2.toString,
49+
),
50+
hadoopConf,
51+
checkEmptyGlobPath = true,
52+
checkFilesExist = true,
53+
)
54+
55+
assert(
56+
resultPaths.equals(
57+
Seq(
58+
globPath1Result1,
59+
globPath1Result2,
60+
globPath2Result1,
61+
globPath2Result2,
62+
)
63+
)
64+
)
65+
}
66+
67+
test("test non glob paths") {
68+
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
69+
Seq(
70+
path1.toString,
71+
path2.toString,
72+
),
73+
hadoopConf,
74+
checkEmptyGlobPath = true,
75+
checkFilesExist = true,
76+
)
77+
78+
assert(
79+
resultPaths.equals(
80+
Seq(
81+
path1,
82+
path2,
83+
)
84+
)
85+
)
86+
}
87+
88+
test("test non existent paths") {
89+
assertThrows[AnalysisException](
90+
DataSource.checkAndGlobPathIfNecessary(
91+
Seq(
92+
path1.toString,
93+
path2.toString,
94+
nonExistentPath.toString,
95+
),
96+
hadoopConf,
97+
checkEmptyGlobPath = true,
98+
checkFilesExist = true,
99+
)
100+
)
101+
}
102+
103+
test("test non existent glob paths") {
104+
assertThrows[AnalysisException](
105+
DataSource.checkAndGlobPathIfNecessary(
106+
Seq(
107+
globPath1.toString,
108+
globPath2.toString,
109+
nonExistentGlobPath.toString,
110+
),
111+
hadoopConf,
112+
checkEmptyGlobPath = true,
113+
checkFilesExist = true,
114+
)
115+
)
116+
}
117+
}
118+
119+
object TestPaths {
120+
val hadoopConf = new Configuration()
121+
hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName)
122+
123+
val path1: Path = new Path("mockFs:///somepath1")
124+
val path2: Path = new Path("mockFs:///somepath2")
125+
val globPath1: Path = new Path("mockFs:///globpath1*")
126+
val globPath2: Path = new Path("mockFs:///globpath2*")
127+
128+
val nonExistentPath: Path = new Path("mockFs:///nonexistentpath")
129+
val nonExistentGlobPath: Path = new Path("mockFs:///nonexistentpath*")
130+
131+
val globPath1Result1: Path = new Path("mockFs:///globpath1/path1")
132+
val globPath1Result2: Path = new Path("mockFs:///globpath1/path2")
133+
val globPath2Result1: Path = new Path("mockFs:///globpath2/path1")
134+
val globPath2Result2: Path = new Path("mockFs:///globpath2/path2")
135+
136+
val allPathsInFs = Seq(
137+
path1,
138+
path2,
139+
globPath1Result1,
140+
globPath1Result2,
141+
globPath2Result1,
142+
globPath2Result2,
143+
)
144+
145+
val mockGlobResults: Map[Path, Array[FileStatus]] = Map(
146+
globPath1 ->
147+
Array(
148+
createMockFileStatus(globPath1Result1.toString),
149+
createMockFileStatus(globPath1Result2.toString),
150+
),
151+
globPath2 ->
152+
Array(
153+
createMockFileStatus(globPath2Result1.toString),
154+
createMockFileStatus(globPath2Result2.toString),
155+
),
156+
)
157+
158+
def createMockFileStatus(path: String): FileStatus = {
159+
val fileStatus = new FileStatus()
160+
fileStatus.setPath(new Path(path))
161+
fileStatus
162+
}
163+
}
164+
165+
class MockFileSystem extends RawLocalFileSystem {
166+
import TestPaths._
167+
168+
override def exists(f: Path): Boolean = {
169+
allPathsInFs.contains(f)
170+
}
171+
172+
override def globStatus(pathPattern: Path): Array[FileStatus] = {
173+
mockGlobResults.getOrElse(pathPattern, Array())
174+
}
175+
}

0 commit comments

Comments
 (0)