From f6cca6b5163acef655d0c0e3d6cd4848b00314e0 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 9 Jun 2020 12:15:07 -0700 Subject: [PATCH 1/3] [SPARK-31935][SQL] Hadoop file system config should be effective in data source options Mkae Hadoop file system config effective in data source options. From `org.apache.hadoop.fs.FileSystem.java`: ``` public static FileSystem get(URI uri, Configuration conf) throws IOException { String scheme = uri.getScheme(); String authority = uri.getAuthority(); if (scheme == null && authority == null) { // use default FS return get(conf); } if (scheme != null && authority == null) { // no authority URI defaultUri = getDefaultUri(conf); if (scheme.equals(defaultUri.getScheme()) // if scheme matches default && defaultUri.getAuthority() != null) { // & default has authority return get(defaultUri, conf); // return default } } String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); if (conf.getBoolean(disableCacheName, false)) { return createFileSystem(uri, conf); } return CACHE.get(uri, conf); } ``` Before changes, the file system configurations in data source options are not propagated in `DataSource.scala`. After changes, we can specify authority and URI schema related configurations for scanning file systems. This problem only exists in data source V1. In V2, we already use `sparkSession.sessionState.newHadoopConfWithOptions(options)` in `FileTable`. Allow users to specify authority and URI schema related Hadoop configurations for file source reading. Yes, the file system related Hadoop configuration in data source option will be effective on reading. Unit test Closes #28760 from gengliangwang/ds_conf. Authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../execution/datasources/DataSource.scala | 13 ++++++------ .../spark/sql/FileBasedDataSourceSuite.scala | 20 +++++++++++++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 12 +++++++++++ 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 3615afcf86c7..588a9b4eb4d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -109,6 +109,9 @@ case class DataSource( private def providingInstance() = providingClass.getConstructor().newInstance() + private def newHadoopConfiguration(): Configuration = + sparkSession.sessionState.newHadoopConfWithOptions(options) + lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver @@ -230,7 +233,7 @@ case class DataSource( // once the streaming job starts and some upstream source starts dropping data. val hdfsPath = new Path(path) if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { - val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val fs = hdfsPath.getFileSystem(newHadoopConfiguration()) if (!fs.exists(hdfsPath)) { throw new AnalysisException(s"Path does not exist: $path") } @@ -357,7 +360,7 @@ case class DataSource( case (format: FileFormat, _) if FileStreamSink.hasMetadata( caseInsensitiveOptions.get("path").toSeq ++ paths, - sparkSession.sessionState.newHadoopConf(), + newHadoopConfiguration(), sparkSession.sessionState.conf) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, @@ -449,7 +452,7 @@ case class DataSource( val allPaths = paths ++ caseInsensitiveOptions.get("path") val outputPath = if (allPaths.length == 1) { val path = new Path(allPaths.head) - val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val fs = path.getFileSystem(newHadoopConfiguration()) path.makeQualified(fs.getUri, fs.getWorkingDirectory) } else { throw new IllegalArgumentException("Expected exactly one path to be specified, but " + @@ -569,9 +572,7 @@ case class DataSource( checkEmptyGlobPath: Boolean, checkFilesExist: Boolean): Seq[Path] = { val allPaths = caseInsensitiveOptions.get("path") ++ paths - val hadoopConf = sparkSession.sessionState.newHadoopConf() - - DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf, + DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, newHadoopConfiguration(), checkEmptyGlobPath, checkFilesExist) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index cb410b4f0d7d..efc7cac6a5f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -843,6 +843,26 @@ class FileBasedDataSourceSuite extends QueryTest } } + test("SPARK-31935: Hadoop file system config should be effective in data source options") { + Seq("parquet", "").foreach { format => + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { + withTempDir { dir => + val path = dir.getCanonicalPath + val defaultFs = "nonexistFS://nonexistFS" + val expectMessage = "No FileSystem for scheme: nonexistFS" + val message1 = intercept[java.io.IOException] { + spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path) + }.getMessage + assert(message1 == expectMessage) + val message2 = intercept[java.io.IOException] { + spark.read.option("fs.defaultFS", defaultFs).parquet(path) + }.getMessage + assert(message2 == expectMessage) + } + } + } + } + test("SPARK-31116: Select nested schema with case insensitive mode") { // This test case failed at only Parquet. ORC is added for test coverage parity. Seq("orc", "parquet").foreach { format => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fa320333143e..32dceaac7059 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -532,6 +532,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-31935: Hadoop file system config should be effective in data source options") { + withTempDir { dir => + val path = dir.getCanonicalPath + val defaultFs = "nonexistFS://nonexistFS" + val expectMessage = "No FileSystem for scheme: nonexistFS" + val message = intercept[java.io.IOException] { + spark.readStream.option("fs.defaultFS", defaultFs).text(path) + }.getMessage + assert(message == expectMessage) + } + } + test("read from textfile") { withTempDirs { case (src, tmp) => val textStream = spark.readStream.textFile(src.getCanonicalPath) From 5623228fd0b095141a138f753b13fd77ae422ab3 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 10 Jun 2020 17:36:32 -0700 Subject: [PATCH 2/3] [SPARK-31935][SQL][TESTS][FOLLOWUP] Fix the test case for Hadoop2/3 ### What changes were proposed in this pull request? This PR updates the test case to accept Hadoop 2/3 error message correctly. ### Why are the changes needed? SPARK-31935(https://github.com/apache/spark/pull/28760) breaks Hadoop 3.2 UT because Hadoop 2 and Hadoop 3 have different exception messages. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the Jenkins with both Hadoop 2/3 or do the following manually. **Hadoop 2.7** ``` $ build/sbt "sql/testOnly *.FileBasedDataSourceSuite -- -z SPARK-31935" ... [info] All tests passed. ``` **Hadoop 3.2** ``` $ build/sbt "sql/testOnly *.FileBasedDataSourceSuite -- -z SPARK-31935" -Phadoop-3.2 ... [info] All tests passed. ``` Closes #28791 from dongjoon-hyun/SPARK-31935. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/FileBasedDataSourceSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index efc7cac6a5f2..d8157d3c779b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -849,15 +849,15 @@ class FileBasedDataSourceSuite extends QueryTest withTempDir { dir => val path = dir.getCanonicalPath val defaultFs = "nonexistFS://nonexistFS" - val expectMessage = "No FileSystem for scheme: nonexistFS" + val expectMessage = "No FileSystem for scheme nonexistFS" val message1 = intercept[java.io.IOException] { spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path) }.getMessage - assert(message1 == expectMessage) + assert(message1.filterNot(Set(':', '"').contains) == expectMessage) val message2 = intercept[java.io.IOException] { spark.read.option("fs.defaultFS", defaultFs).parquet(path) }.getMessage - assert(message2 == expectMessage) + assert(message2.filterNot(Set(':', '"').contains) == expectMessage) } } } From da8d48d7984dd523f44c564ead9f7d5fb9cdd4ef Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 10 Jun 2020 20:59:48 -0700 Subject: [PATCH 3/3] [SPARK-31935][SQL][TESTS][FOLLOWUP] Fix the test case for Hadoop2/3 This PR updates the test case to accept Hadoop 2/3 error message correctly. SPARK-31935(#28760) breaks Hadoop 3.2 UT because Hadoop 2 and Hadoop 3 have different exception messages. In https://github.com/apache/spark/pull/28791, there are two test suites missed the fix No Unit test Closes #28796 from gengliangwang/SPARK-31926-followup. Authored-by: Gengliang Wang Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/streaming/FileStreamSourceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 32dceaac7059..7b16aebc531f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -536,11 +536,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest { withTempDir { dir => val path = dir.getCanonicalPath val defaultFs = "nonexistFS://nonexistFS" - val expectMessage = "No FileSystem for scheme: nonexistFS" + val expectMessage = "No FileSystem for scheme nonexistFS" val message = intercept[java.io.IOException] { spark.readStream.option("fs.defaultFS", defaultFs).text(path) }.getMessage - assert(message == expectMessage) + assert(message.filterNot(Set(':', '"').contains) == expectMessage) } }