From e5b5751e82ed10fa69f99d5221bf0924d41e256a Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 8 Jun 2020 15:59:23 -0700 Subject: [PATCH 1/7] Data source options should be propagated in method checkAndGlobPathIfNecessary --- .../sql/execution/datasources/DataSource.scala | 2 +- .../execution/datasources/DataSourceSuite.scala | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 222fea652826..3879d75be2b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -570,7 +570,7 @@ case class DataSource( checkEmptyGlobPath: Boolean, checkFilesExist: Boolean): Seq[Path] = { val allPaths = caseInsensitiveOptions.get("path") ++ paths - val hadoopConf = sparkSession.sessionState.newHadoopConf() + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf, checkEmptyGlobPath, checkFilesExist) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index 1e3c660e0945..434fdcdb438c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} +import org.scalatest.PrivateMethodTester import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.test.SharedSparkSession -class DataSourceSuite extends SharedSparkSession { +class DataSourceSuite extends SharedSparkSession with PrivateMethodTester { import TestPaths._ test("test glob and non glob paths") { @@ -132,6 +133,18 @@ class DataSourceSuite extends SharedSparkSession { ) ) } + + test("Data source options should be propagated in method checkAndGlobPathIfNecessary") { + val fsDefaultName = "mockFs://mockFs" + val dataSource = + DataSource(spark, "parquet", Seq("/path1"), options = Map("fs.defaultFS" -> fsDefaultName)) + val checkAndGlobPathIfNecessary = PrivateMethod[Seq[Path]]('checkAndGlobPathIfNecessary) + + val message = intercept[java.io.IOException] { + dataSource invokePrivate checkAndGlobPathIfNecessary(false, false) + }.getMessage + assert(message.equals("No FileSystem for scheme: mockFs")) + } } object TestPaths { From 0dada86535ee680d37fc472966225b614d980b50 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 8 Jun 2020 16:27:30 -0700 Subject: [PATCH 2/7] revise test case --- .../spark/sql/execution/datasources/DataSourceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index 434fdcdb438c..15f1e58a5677 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -135,9 +135,9 @@ class DataSourceSuite extends SharedSparkSession with PrivateMethodTester { } test("Data source options should be propagated in method checkAndGlobPathIfNecessary") { - val fsDefaultName = "mockFs://mockFs" + val dataSourceOptions = Map("fs.defaultFS" -> "mockFs://mockFs") val dataSource = - DataSource(spark, "parquet", Seq("/path1"), options = Map("fs.defaultFS" -> fsDefaultName)) + DataSource(spark, "parquet", Seq("/path1"), options = dataSourceOptions) val checkAndGlobPathIfNecessary = PrivateMethod[Seq[Path]]('checkAndGlobPathIfNecessary) val message = intercept[java.io.IOException] { From 0513869d84b3edbdaa9a44488c5d3b5f6ba71f90 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 8 Jun 2020 16:43:36 -0700 Subject: [PATCH 3/7] fix other hadoop conf --- .../spark/sql/execution/datasources/DataSource.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 3879d75be2b0..b8b791c225c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -110,6 +110,9 @@ case class DataSource( private def providingInstance() = providingClass.getConstructor().newInstance() + private def newHadoopConfiguration(): Configuration = + sparkSession.sessionState.newHadoopConfWithOptions(options) + lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver @@ -231,7 +234,7 @@ case class DataSource( // once the streaming job starts and some upstream source starts dropping data. val hdfsPath = new Path(path) if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { - val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val fs = hdfsPath.getFileSystem(newHadoopConfiguration()) if (!fs.exists(hdfsPath)) { throw new AnalysisException(s"Path does not exist: $path") } @@ -358,7 +361,7 @@ case class DataSource( case (format: FileFormat, _) if FileStreamSink.hasMetadata( caseInsensitiveOptions.get("path").toSeq ++ paths, - sparkSession.sessionState.newHadoopConf(), + newHadoopConfiguration(), sparkSession.sessionState.conf) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, @@ -450,7 +453,7 @@ case class DataSource( val allPaths = paths ++ caseInsensitiveOptions.get("path") val outputPath = if (allPaths.length == 1) { val path = new Path(allPaths.head) - val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val fs = path.getFileSystem(newHadoopConfiguration()) path.makeQualified(fs.getUri, fs.getWorkingDirectory) } else { throw new IllegalArgumentException("Expected exactly one path to be specified, but " + From 8aa11cc70dfc1f308a3872d1a9d6b67b2ec50b9b Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 8 Jun 2020 17:03:22 -0700 Subject: [PATCH 4/7] add test cases --- .../spark/sql/FileBasedDataSourceSuite.scala | 20 +++++++++++++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 12 +++++++++++ 2 files changed, 32 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index cb410b4f0d7d..fe4c65ff2394 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -843,6 +843,26 @@ class FileBasedDataSourceSuite extends QueryTest } } + test("SPARK-31935: Hadoop file system related data source options should be effective") { + Seq("parquet", "").foreach { format => + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { + withTempDir { dir => + val path = dir.getCanonicalPath + val defaultFs = "nonexistFS://nonexistFS" + val expectMessage = "No FileSystem for scheme: nonexistFS" + val message1 = intercept[java.io.IOException] { + spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path) + }.getMessage + assert(message1 == expectMessage) + val message2 = intercept[java.io.IOException] { + spark.read.option("fs.defaultFS", defaultFs).parquet(path) + }.getMessage + assert(message2 == expectMessage) + } + } + } + } + test("SPARK-31116: Select nested schema with case insensitive mode") { // This test case failed at only Parquet. ORC is added for test coverage parity. Seq("orc", "parquet").foreach { format => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fa320333143e..51d7e8deb563 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -532,6 +532,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-31935: Hadoop file system related data source options should be effective") { + withTempDir { dir => + val path = dir.getCanonicalPath + val defaultFs = "nonexistFS://nonexistFS" + val expectMessage = "No FileSystem for scheme: nonexistFS" + val message = intercept[java.io.IOException] { + spark.readStream.option("fs.defaultFS", defaultFs).text(path) + }.getMessage + assert(message == expectMessage) + } + } + test("read from textfile") { withTempDirs { case (src, tmp) => val textStream = spark.readStream.textFile(src.getCanonicalPath) From 806b45b08043b3266c21c9c4674f83e9d2f2fac2 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 8 Jun 2020 17:06:31 -0700 Subject: [PATCH 5/7] revise test case name --- .../scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala | 2 +- .../org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index fe4c65ff2394..efc7cac6a5f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -843,7 +843,7 @@ class FileBasedDataSourceSuite extends QueryTest } } - test("SPARK-31935: Hadoop file system related data source options should be effective") { + test("SPARK-31935: Hadoop file system config should be effective in data source options") { Seq("parquet", "").foreach { format => withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { withTempDir { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 51d7e8deb563..32dceaac7059 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -532,7 +532,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } - test("SPARK-31935: Hadoop file system related data source options should be effective") { + test("SPARK-31935: Hadoop file system config should be effective in data source options") { withTempDir { dir => val path = dir.getCanonicalPath val defaultFs = "nonexistFS://nonexistFS" From 922efd52016115b2d9fd9fbf04de678e9f423cc2 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 8 Jun 2020 17:10:35 -0700 Subject: [PATCH 6/7] revise --- .../apache/spark/sql/execution/datasources/DataSource.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b8b791c225c0..07d7c4e97a09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -573,9 +573,7 @@ case class DataSource( checkEmptyGlobPath: Boolean, checkFilesExist: Boolean): Seq[Path] = { val allPaths = caseInsensitiveOptions.get("path") ++ paths - val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) - - DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf, + DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, newHadoopConfiguration(), checkEmptyGlobPath, checkFilesExist) } } From 4d88604a0f73dc0b1a6547c93c9e6de6d6e71a8a Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 8 Jun 2020 20:08:08 -0700 Subject: [PATCH 7/7] fix test failure --- .../spark/sql/execution/datasources/DataSourceSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index 15f1e58a5677..9345158fd07e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -135,15 +135,14 @@ class DataSourceSuite extends SharedSparkSession with PrivateMethodTester { } test("Data source options should be propagated in method checkAndGlobPathIfNecessary") { - val dataSourceOptions = Map("fs.defaultFS" -> "mockFs://mockFs") - val dataSource = - DataSource(spark, "parquet", Seq("/path1"), options = dataSourceOptions) + val dataSourceOptions = Map("fs.defaultFS" -> "nonexistsFs://nonexistsFs") + val dataSource = DataSource(spark, "parquet", Seq("/path3"), options = dataSourceOptions) val checkAndGlobPathIfNecessary = PrivateMethod[Seq[Path]]('checkAndGlobPathIfNecessary) val message = intercept[java.io.IOException] { dataSource invokePrivate checkAndGlobPathIfNecessary(false, false) }.getMessage - assert(message.equals("No FileSystem for scheme: mockFs")) + assert(message.equals("No FileSystem for scheme: nonexistsFs")) } }