Skip to content

Commit f3771c6

Browse files
committed
[SPARK-31935][SQL] Hadoop file system config should be effective in data source options
### What changes were proposed in this pull request? Mkae Hadoop file system config effective in data source options. From `org.apache.hadoop.fs.FileSystem.java`: ``` public static FileSystem get(URI uri, Configuration conf) throws IOException { String scheme = uri.getScheme(); String authority = uri.getAuthority(); if (scheme == null && authority == null) { // use default FS return get(conf); } if (scheme != null && authority == null) { // no authority URI defaultUri = getDefaultUri(conf); if (scheme.equals(defaultUri.getScheme()) // if scheme matches default && defaultUri.getAuthority() != null) { // & default has authority return get(defaultUri, conf); // return default } } String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); if (conf.getBoolean(disableCacheName, false)) { return createFileSystem(uri, conf); } return CACHE.get(uri, conf); } ``` Before changes, the file system configurations in data source options are not propagated in `DataSource.scala`. After changes, we can specify authority and URI schema related configurations for scanning file systems. This problem only exists in data source V1. In V2, we already use `sparkSession.sessionState.newHadoopConfWithOptions(options)` in `FileTable`. ### Why are the changes needed? Allow users to specify authority and URI schema related Hadoop configurations for file source reading. ### Does this PR introduce _any_ user-facing change? Yes, the file system related Hadoop configuration in data source option will be effective on reading. ### How was this patch tested? Unit test Closes apache#28760 from gengliangwang/ds_conf. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent 6a424b9 commit f3771c6

File tree

4 files changed

+52
-7
lines changed

4 files changed

+52
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ case class DataSource(
110110

111111
private def providingInstance() = providingClass.getConstructor().newInstance()
112112

113+
private def newHadoopConfiguration(): Configuration =
114+
sparkSession.sessionState.newHadoopConfWithOptions(options)
115+
113116
lazy val sourceInfo: SourceInfo = sourceSchema()
114117
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
115118
private val equality = sparkSession.sessionState.conf.resolver
@@ -231,7 +234,7 @@ case class DataSource(
231234
// once the streaming job starts and some upstream source starts dropping data.
232235
val hdfsPath = new Path(path)
233236
if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
234-
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
237+
val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
235238
if (!fs.exists(hdfsPath)) {
236239
throw new AnalysisException(s"Path does not exist: $path")
237240
}
@@ -358,7 +361,7 @@ case class DataSource(
358361
case (format: FileFormat, _)
359362
if FileStreamSink.hasMetadata(
360363
caseInsensitiveOptions.get("path").toSeq ++ paths,
361-
sparkSession.sessionState.newHadoopConf(),
364+
newHadoopConfiguration(),
362365
sparkSession.sessionState.conf) =>
363366
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
364367
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
@@ -450,7 +453,7 @@ case class DataSource(
450453
val allPaths = paths ++ caseInsensitiveOptions.get("path")
451454
val outputPath = if (allPaths.length == 1) {
452455
val path = new Path(allPaths.head)
453-
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
456+
val fs = path.getFileSystem(newHadoopConfiguration())
454457
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
455458
} else {
456459
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
@@ -570,9 +573,7 @@ case class DataSource(
570573
checkEmptyGlobPath: Boolean,
571574
checkFilesExist: Boolean): Seq[Path] = {
572575
val allPaths = caseInsensitiveOptions.get("path") ++ paths
573-
val hadoopConf = sparkSession.sessionState.newHadoopConf()
574-
575-
DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf,
576+
DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, newHadoopConfiguration(),
576577
checkEmptyGlobPath, checkFilesExist)
577578
}
578579
}

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,26 @@ class FileBasedDataSourceSuite extends QueryTest
843843
}
844844
}
845845

846+
test("SPARK-31935: Hadoop file system config should be effective in data source options") {
847+
Seq("parquet", "").foreach { format =>
848+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
849+
withTempDir { dir =>
850+
val path = dir.getCanonicalPath
851+
val defaultFs = "nonexistFS://nonexistFS"
852+
val expectMessage = "No FileSystem for scheme: nonexistFS"
853+
val message1 = intercept[java.io.IOException] {
854+
spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
855+
}.getMessage
856+
assert(message1 == expectMessage)
857+
val message2 = intercept[java.io.IOException] {
858+
spark.read.option("fs.defaultFS", defaultFs).parquet(path)
859+
}.getMessage
860+
assert(message2 == expectMessage)
861+
}
862+
}
863+
}
864+
}
865+
846866
test("SPARK-31116: Select nested schema with case insensitive mode") {
847867
// This test case failed at only Parquet. ORC is added for test coverage parity.
848868
Seq("orc", "parquet").foreach { format =>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ package org.apache.spark.sql.execution.datasources
1919

2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
22+
import org.scalatest.PrivateMethodTester
2223

2324
import org.apache.spark.sql.AnalysisException
2425
import org.apache.spark.sql.test.SharedSparkSession
2526

26-
class DataSourceSuite extends SharedSparkSession {
27+
class DataSourceSuite extends SharedSparkSession with PrivateMethodTester {
2728
import TestPaths._
2829

2930
test("test glob and non glob paths") {
@@ -132,6 +133,17 @@ class DataSourceSuite extends SharedSparkSession {
132133
)
133134
)
134135
}
136+
137+
test("Data source options should be propagated in method checkAndGlobPathIfNecessary") {
138+
val dataSourceOptions = Map("fs.defaultFS" -> "nonexistsFs://nonexistsFs")
139+
val dataSource = DataSource(spark, "parquet", Seq("/path3"), options = dataSourceOptions)
140+
val checkAndGlobPathIfNecessary = PrivateMethod[Seq[Path]]('checkAndGlobPathIfNecessary)
141+
142+
val message = intercept[java.io.IOException] {
143+
dataSource invokePrivate checkAndGlobPathIfNecessary(false, false)
144+
}.getMessage
145+
assert(message.equals("No FileSystem for scheme: nonexistsFs"))
146+
}
135147
}
136148

137149
object TestPaths {

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
532532
}
533533
}
534534

535+
test("SPARK-31935: Hadoop file system config should be effective in data source options") {
536+
withTempDir { dir =>
537+
val path = dir.getCanonicalPath
538+
val defaultFs = "nonexistFS://nonexistFS"
539+
val expectMessage = "No FileSystem for scheme: nonexistFS"
540+
val message = intercept[java.io.IOException] {
541+
spark.readStream.option("fs.defaultFS", defaultFs).text(path)
542+
}.getMessage
543+
assert(message == expectMessage)
544+
}
545+
}
546+
535547
test("read from textfile") {
536548
withTempDirs { case (src, tmp) =>
537549
val textStream = spark.readStream.textFile(src.getCanonicalPath)

0 commit comments

Comments
 (0)