Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
be97aa5
SPARK-29089 First pass
cozos Sep 16, 2019
b0597f6
Use parallel collections
cozos Sep 16, 2019
3b2d42d
First pass test
cozos Sep 16, 2019
6887d4f
Add tests
cozos Sep 23, 2019
1e9fac8
Merge pull request #2 from apache/master
cozos Sep 23, 2019
8a471ff
Merge branch 'master' of github.com:cozos/spark into SPARK-29089
cozos Sep 23, 2019
1df3316
Update comments
cozos Sep 23, 2019
65239f9
Convert parallel collection
cozos Sep 23, 2019
1d7975c
Fix test
cozos Sep 23, 2019
8b24d85
Merge pull request #1 from cozos/SPARK-29089
cozos Sep 23, 2019
2a24b5f
Remove trailing commas for Scalastyle
cozos Sep 23, 2019
0cd31ad
Use set comparisons in test
cozos Sep 23, 2019
e8a4d6d
Map formatting
cozos Sep 24, 2019
99d6590
Use partition function
cozos Sep 24, 2019
41f6cfa
Use === in asserts
cozos Sep 24, 2019
ba36de4
Add authority to test paths
cozos Sep 24, 2019
06b2b45
Use ThreadUtils instead of parallel collections
cozos Sep 24, 2019
bcd97af
Increase num threads to 20
cozos Sep 24, 2019
3a7d23a
Change map style
cozos Sep 25, 2019
7c61252
Don't launch unnecessary parallel exists task
cozos Sep 25, 2019
b2fb28a
Merge pull request #3 from apache/master
cozos Oct 25, 2019
9245d9b
Merge pull request #5 from apache/master
cozos Oct 26, 2019
2bed5b9
Use normal try catch
cozos Oct 26, 2019
dc48da8
Set numThreads to 40
cozos Nov 11, 2019
8cbc28a
Pull out getFileSystem
cozos Dec 11, 2019
c52b3b0
Use toSeq instead of seq
cozos Dec 11, 2019
a3cdc59
Remove : Path type
cozos Dec 11, 2019
105235c
Revert "Pull out getFileSystem"
cozos Dec 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
Expand All @@ -50,7 +51,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
Expand Down Expand Up @@ -734,30 +735,53 @@ object DataSource extends Logging {
* Checks and returns files in all the paths.
*/
private[sql] def checkAndGlobPathIfNecessary(
paths: Seq[String],
pathStrings: Seq[String],
hadoopConf: Configuration,
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allGlobPath = paths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
checkFilesExist: Boolean,
numThreads: Integer = 40): Seq[Path] = {
val qualifiedPaths = pathStrings.map { pathString =>
val path = new Path(pathString)
val fs = path.getFileSystem(hadoopConf)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}

// Split the paths into glob and non glob paths, because we don't need to do an existence check
// for globbed paths.
val (globPaths, nonGlobPaths) = qualifiedPaths.partition(SparkHadoopUtil.get.isGlobPath)

val globbedPaths =
try {
ThreadUtils.parmap(globPaths, "globPath", numThreads) { globPath =>
val fs = globPath.getFileSystem(hadoopConf)
val globResult = SparkHadoopUtil.get.globPath(fs, globPath)

if (checkEmptyGlobPath && globResult.isEmpty) {
throw new AnalysisException(s"Path does not exist: $globPath")
}

globResult
}.flatten
} catch {
case e: SparkException => throw e.getCause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there ever the case that the cause is null?

Copy link
Contributor Author

@cozos cozos Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkException comes from ThreadUtils#parmap and ThreadUtils#awaitResult

Which always seems to wrap another exception i.e. never null

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this silently drops the caller stack trace, I opened SPARK-47833(#46028) to fix it.

}

// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
if (checkFilesExist) {
try {
ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", numThreads) { path =>
val fs = path.getFileSystem(hadoopConf)
if (!fs.exists(path)) {
throw new AnalysisException(s"Path does not exist: $path")
}
}
} catch {
case e: SparkException => throw e.getCause
}
globPath
}

val allPaths = globbedPaths ++ nonGlobPaths
if (checkFilesExist) {
val (filteredOut, filteredIn) = allGlobPath.partition { path =>
val (filteredOut, filteredIn) = allPaths.partition { path =>
InMemoryFileIndex.shouldFilterOut(path.getName)
}
if (filteredIn.isEmpty) {
Expand All @@ -769,7 +793,7 @@ object DataSource extends Logging {
}
}

allGlobPath
allPaths.toSeq
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.test.SharedSparkSession

class DataSourceSuite extends SharedSparkSession {
import TestPaths._

test("test glob and non glob paths") {
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
Seq(
path1.toString,
path2.toString,
globPath1.toString,
globPath2.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true
)

assert(resultPaths.toSet === allPathsInFs.toSet)
}

test("test glob paths") {
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
Seq(
globPath1.toString,
globPath2.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true
)

assert(
resultPaths.toSet === Set(
globPath1Result1,
globPath1Result2,
globPath2Result1,
globPath2Result2
)
)
}

test("test non glob paths") {
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
Seq(
path1.toString,
path2.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true
)

assert(
resultPaths.toSet === Set(
path1,
path2
)
)
}

test("test non glob paths checkFilesExist=false") {
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
Seq(
path1.toString,
path2.toString,
nonExistentPath.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = false
)

assert(
resultPaths.toSet === Set(
path1,
path2,
nonExistentPath
)
)
}

test("test non existent paths") {
assertThrows[AnalysisException](
DataSource.checkAndGlobPathIfNecessary(
Seq(
path1.toString,
path2.toString,
nonExistentPath.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true
)
)
}

test("test non existent glob paths") {
assertThrows[AnalysisException](
DataSource.checkAndGlobPathIfNecessary(
Seq(
globPath1.toString,
globPath2.toString,
nonExistentGlobPath.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true
)
)
}
}

object TestPaths {
val hadoopConf = new Configuration()
hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed here, but it is worth knowing that there is a package scoped static method Filesystem.addFileSystemForTesting to actually let you add a specific instance to the file system cache. I use sometimes as it lets me register mob file systems for existing schemes (s3, etc).


val path1 = new Path("mockFs://mockFs/somepath1")
val path2 = new Path("mockFs://mockFs/somepath2")
val globPath1 = new Path("mockFs://mockFs/globpath1*")
val globPath2 = new Path("mockFs://mockFs/globpath2*")

val nonExistentPath = new Path("mockFs://mockFs/nonexistentpath")
val nonExistentGlobPath = new Path("mockFs://mockFs/nonexistentpath*")

val globPath1Result1 = new Path("mockFs://mockFs/globpath1/path1")
val globPath1Result2 = new Path("mockFs://mockFs/globpath1/path2")
val globPath2Result1 = new Path("mockFs://mockFs/globpath2/path1")
val globPath2Result2 = new Path("mockFs://mockFs/globpath2/path2")

val allPathsInFs = Seq(
path1,
path2,
globPath1Result1,
globPath1Result2,
globPath2Result1,
globPath2Result2
)

val mockGlobResults: Map[Path, Array[FileStatus]] = Map(
globPath1 ->
Array(
createMockFileStatus(globPath1Result1.toString),
createMockFileStatus(globPath1Result2.toString)
),
globPath2 ->
Array(
createMockFileStatus(globPath2Result1.toString),
createMockFileStatus(globPath2Result2.toString)
)
)

def createMockFileStatus(path: String): FileStatus = {
val fileStatus = new FileStatus()
fileStatus.setPath(new Path(path))
fileStatus
}
}

class MockFileSystem extends RawLocalFileSystem {
import TestPaths._

override def exists(f: Path): Boolean = {
allPathsInFs.contains(f)
}

override def globStatus(pathPattern: Path): Array[FileStatus] = {
mockGlobResults.getOrElse(pathPattern, Array())
}
}