Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ case class DataSource(

private def providingInstance() = providingClass.getConstructor().newInstance()

private def newHadoopConfiguration(): Configuration =
sparkSession.sessionState.newHadoopConfWithOptions(options)

lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
Expand Down Expand Up @@ -230,7 +233,7 @@ case class DataSource(
// once the streaming job starts and some upstream source starts dropping data.
val hdfsPath = new Path(path)
if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
if (!fs.exists(hdfsPath)) {
throw new AnalysisException(s"Path does not exist: $path")
}
Expand Down Expand Up @@ -357,7 +360,7 @@ case class DataSource(
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf(),
newHadoopConfiguration(),
sparkSession.sessionState.conf) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
Expand Down Expand Up @@ -449,7 +452,7 @@ case class DataSource(
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
val fs = path.getFileSystem(newHadoopConfiguration())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
Expand Down Expand Up @@ -569,9 +572,7 @@ case class DataSource(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()

DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf,
DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, newHadoopConfiguration(),
checkEmptyGlobPath, checkFilesExist)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,26 @@ class FileBasedDataSourceSuite extends QueryTest
}
}

test("SPARK-31935: Hadoop file system config should be effective in data source options") {
Seq("parquet", "").foreach { format =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
withTempDir { dir =>
val path = dir.getCanonicalPath
val defaultFs = "nonexistFS://nonexistFS"
val expectMessage = "No FileSystem for scheme nonexistFS"
val message1 = intercept[java.io.IOException] {
spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
}.getMessage
assert(message1.filterNot(Set(':', '"').contains) == expectMessage)
val message2 = intercept[java.io.IOException] {
spark.read.option("fs.defaultFS", defaultFs).parquet(path)
}.getMessage
assert(message2.filterNot(Set(':', '"').contains) == expectMessage)
}
}
}
}

test("SPARK-31116: Select nested schema with case insensitive mode") {
// This test case failed at only Parquet. ORC is added for test coverage parity.
Seq("orc", "parquet").foreach { format =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-31935: Hadoop file system config should be effective in data source options") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val defaultFs = "nonexistFS://nonexistFS"
val expectMessage = "No FileSystem for scheme nonexistFS"
val message = intercept[java.io.IOException] {
spark.readStream.option("fs.defaultFS", defaultFs).text(path)
}.getMessage
assert(message.filterNot(Set(':', '"').contains) == expectMessage)
}
}

test("read from textfile") {
withTempDirs { case (src, tmp) =>
val textStream = spark.readStream.textFile(src.getCanonicalPath)
Expand Down