diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 5b3059652922..93a80c70f5d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -237,13 +237,18 @@ class CacheManager extends Logging { * Tries to re-cache all the cache entries that contain `resourcePath` in one or more * `HadoopFsRelation` node(s) as part of its logical plan. */ - def recacheByPath(spark: SparkSession, resourcePath: String): Unit = writeLock { - val (fs, qualifiedPath) = { - val path = new Path(resourcePath) - val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) - (fs, fs.makeQualified(path)) - } + def recacheByPath(spark: SparkSession, resourcePath: String): Unit = { + val path = new Path(resourcePath) + val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) + recacheByPath(spark, path, fs) + } + /** + * Tries to re-cache all the cache entries that contain `resourcePath` in one or more + * `HadoopFsRelation` node(s) as part of its logical plan. + */ + def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): Unit = writeLock { + val qualifiedPath = fs.makeQualified(resourcePath) recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 484942d35c85..6125b7987009 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -184,7 +184,7 @@ case class InsertIntoHadoopFsRelationCommand( // refresh cached files in FileIndex fileIndex.foreach(_.refresh()) // refresh data cache if table is cached - sparkSession.catalog.refreshByPath(outputPath.toString) + sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs) if (catalogTable.nonEmpty) { CommandUtils.updateTableStats(sparkSession, catalogTable.get) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 16cd570901c2..ad84a9f30582 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -238,7 +238,7 @@ class ParquetFileFormat .orElse(filesByType.data.headOption) .toSeq } - ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession) + ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession) } case class FileTypes( @@ -569,11 +569,13 @@ object ParquetFileFormat extends Logging { * S3 nodes). */ def mergeSchemasInParallel( + parameters: Map[String, String], filesToTouch: Seq[FileStatus], sparkSession: SparkSession): Option[StructType] = { val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp - val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) + val serializedConf = new SerializableConfiguration( + sparkSession.sessionState.newHadoopConfWithOptions(parameters)) // !! HACK ALERT !! // diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index dd6128ac167d..e6684bee9227 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -18,11 +18,13 @@ package org.apache.spark.sql import java.io.{File, FileNotFoundException} +import java.net.URI import java.util.Locale import scala.collection.mutable -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException @@ -479,18 +481,15 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } test("SPARK-31935: Hadoop file system config should be effective in data source options") { - withTempDir { dir => - val path = dir.getCanonicalPath - val defaultFs = "nonexistFS://nonexistFS" - val expectMessage = "No FileSystem for scheme: nonexistFS" - val message1 = intercept[java.io.IOException] { - spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path) - }.getMessage - assert(message1 == expectMessage) - val message2 = intercept[java.io.IOException] { - spark.read.option("fs.defaultFS", defaultFs).parquet(path) - }.getMessage - assert(message2 == expectMessage) + withSQLConf( + "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName, + "fs.file.impl.disable.cache" -> "true") { + withTempDir { dir => + val path = "file:" + dir.getCanonicalPath.stripPrefix("file:") + spark.range(10).write.option("ds_option", "value").mode("overwrite").parquet(path) + checkAnswer( + spark.read.option("ds_option", "value").parquet(path), spark.range(10).toDF()) + } } } @@ -543,3 +542,10 @@ object TestingUDT { override def userClass: Class[NullData] = classOf[NullData] } } + +class FakeFileSystemRequiringDSOption extends LocalFileSystem { + override def initialize(name: URI, conf: Configuration): Unit = { + super.initialize(name, conf) + require(conf.get("ds_option", "") == "value") + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 8cf344bb71c4..b63215ac93c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles OrcFileOperator.readSchema( files.map(_.getPath.toString), - Some(sparkSession.sessionState.newHadoopConf()), + Some(sparkSession.sessionState.newHadoopConfWithOptions(options)), ignoreCorruptFiles ) }