From 1fe95be9b32bacb8198007544a1cc1b197f20786 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 2 Jul 2020 06:09:54 +0800 Subject: [PATCH] [SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options This is a followup of https://github.com/apache/spark/pull/28760 to fix the remaining issues: 1. should consider data source options when refreshing cache by path at the end of `InsertIntoHadoopFsRelationCommand` 2. should consider data source options when inferring schema for file source 3. should consider data source options when getting the qualified path in file source v2. We didn't catch these issues in https://github.com/apache/spark/pull/28760, because the test case is to check error when initializing the file system. If we initialize the file system multiple times during a simple read/write action, the test case actually only test the first time. No rewrite the test to make sure the entire data source read/write action can succeed. Closes #28948 from cloud-fan/fix. Authored-by: Wenchen Fan Signed-off-by: Gengliang Wang (cherry picked from commit 6edb20df834f7f9b85c1559ef78a3d0d2272e4df) Signed-off-by: Gengliang Wang --- .../spark/sql/execution/CacheManager.scala | 17 ++++++---- .../InsertIntoHadoopFsRelationCommand.scala | 2 +- .../parquet/ParquetFileFormat.scala | 6 ++-- .../spark/sql/FileBasedDataSourceSuite.scala | 32 +++++++++++-------- .../spark/sql/hive/orc/OrcFileFormat.scala | 2 +- 5 files changed, 36 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 5b3059652922..93a80c70f5d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -237,13 +237,18 @@ class CacheManager extends Logging { * Tries to re-cache all the cache entries that contain `resourcePath` in one or more * `HadoopFsRelation` node(s) as part of its logical plan. */ - def recacheByPath(spark: SparkSession, resourcePath: String): Unit = writeLock { - val (fs, qualifiedPath) = { - val path = new Path(resourcePath) - val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) - (fs, fs.makeQualified(path)) - } + def recacheByPath(spark: SparkSession, resourcePath: String): Unit = { + val path = new Path(resourcePath) + val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) + recacheByPath(spark, path, fs) + } + /** + * Tries to re-cache all the cache entries that contain `resourcePath` in one or more + * `HadoopFsRelation` node(s) as part of its logical plan. + */ + def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): Unit = writeLock { + val qualifiedPath = fs.makeQualified(resourcePath) recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 484942d35c85..6125b7987009 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -184,7 +184,7 @@ case class InsertIntoHadoopFsRelationCommand( // refresh cached files in FileIndex fileIndex.foreach(_.refresh()) // refresh data cache if table is cached - sparkSession.catalog.refreshByPath(outputPath.toString) + sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs) if (catalogTable.nonEmpty) { CommandUtils.updateTableStats(sparkSession, catalogTable.get) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 16cd570901c2..ad84a9f30582 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -238,7 +238,7 @@ class ParquetFileFormat .orElse(filesByType.data.headOption) .toSeq } - ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession) + ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession) } case class FileTypes( @@ -569,11 +569,13 @@ object ParquetFileFormat extends Logging { * S3 nodes). */ def mergeSchemasInParallel( + parameters: Map[String, String], filesToTouch: Seq[FileStatus], sparkSession: SparkSession): Option[StructType] = { val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp - val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) + val serializedConf = new SerializableConfiguration( + sparkSession.sessionState.newHadoopConfWithOptions(parameters)) // !! HACK ALERT !! // diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index dd6128ac167d..e6684bee9227 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -18,11 +18,13 @@ package org.apache.spark.sql import java.io.{File, FileNotFoundException} +import java.net.URI import java.util.Locale import scala.collection.mutable -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException @@ -479,18 +481,15 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } test("SPARK-31935: Hadoop file system config should be effective in data source options") { - withTempDir { dir => - val path = dir.getCanonicalPath - val defaultFs = "nonexistFS://nonexistFS" - val expectMessage = "No FileSystem for scheme: nonexistFS" - val message1 = intercept[java.io.IOException] { - spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path) - }.getMessage - assert(message1 == expectMessage) - val message2 = intercept[java.io.IOException] { - spark.read.option("fs.defaultFS", defaultFs).parquet(path) - }.getMessage - assert(message2 == expectMessage) + withSQLConf( + "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName, + "fs.file.impl.disable.cache" -> "true") { + withTempDir { dir => + val path = "file:" + dir.getCanonicalPath.stripPrefix("file:") + spark.range(10).write.option("ds_option", "value").mode("overwrite").parquet(path) + checkAnswer( + spark.read.option("ds_option", "value").parquet(path), spark.range(10).toDF()) + } } } @@ -543,3 +542,10 @@ object TestingUDT { override def userClass: Class[NullData] = classOf[NullData] } } + +class FakeFileSystemRequiringDSOption extends LocalFileSystem { + override def initialize(name: URI, conf: Configuration): Unit = { + super.initialize(name, conf) + require(conf.get("ds_option", "") == "value") + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 8cf344bb71c4..b63215ac93c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles OrcFileOperator.readSchema( files.map(_.getPath.toString), - Some(sparkSession.sessionState.newHadoopConf()), + Some(sparkSession.sessionState.newHadoopConfWithOptions(options)), ignoreCorruptFiles ) }