Skip to content

Commit 7f4d452

Browse files
cloud-fanHyukjinKwon
authored andcommitted
[SPARK-31935][2.4][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options
### What changes were proposed in this pull request? backport #28948 This is a followup of #28760 to fix the remaining issues: 1. should consider data source options when refreshing cache by path at the end of `InsertIntoHadoopFsRelationCommand` 2. should consider data source options when inferring schema for file source 3. should consider data source options when getting the qualified path in file source v2. ### Why are the changes needed? We didn't catch these issues in #28760, because the test case is to check error when initializing the file system. If we initialize the file system multiple times during a simple read/write action, the test case actually only test the first time. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? rewrite the test to make sure the entire data source read/write action can succeed. Closes #28973 from cloud-fan/pick. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent bc1acfe commit 7f4d452

File tree

5 files changed

+36
-23
lines changed

5 files changed

+36
-23
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,18 @@ class CacheManager extends Logging {
237237
* Tries to re-cache all the cache entries that contain `resourcePath` in one or more
238238
* `HadoopFsRelation` node(s) as part of its logical plan.
239239
*/
240-
def recacheByPath(spark: SparkSession, resourcePath: String): Unit = writeLock {
241-
val (fs, qualifiedPath) = {
242-
val path = new Path(resourcePath)
243-
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
244-
(fs, fs.makeQualified(path))
245-
}
240+
def recacheByPath(spark: SparkSession, resourcePath: String): Unit = {
241+
val path = new Path(resourcePath)
242+
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
243+
recacheByPath(spark, path, fs)
244+
}
246245

246+
/**
247+
* Tries to re-cache all the cache entries that contain `resourcePath` in one or more
248+
* `HadoopFsRelation` node(s) as part of its logical plan.
249+
*/
250+
def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): Unit = writeLock {
251+
val qualifiedPath = fs.makeQualified(resourcePath)
247252
recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
248253
}
249254

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ case class InsertIntoHadoopFsRelationCommand(
184184
// refresh cached files in FileIndex
185185
fileIndex.foreach(_.refresh())
186186
// refresh data cache if table is cached
187-
sparkSession.catalog.refreshByPath(outputPath.toString)
187+
sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs)
188188

189189
if (catalogTable.nonEmpty) {
190190
CommandUtils.updateTableStats(sparkSession, catalogTable.get)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ class ParquetFileFormat
238238
.orElse(filesByType.data.headOption)
239239
.toSeq
240240
}
241-
ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
241+
ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
242242
}
243243

244244
case class FileTypes(
@@ -569,11 +569,13 @@ object ParquetFileFormat extends Logging {
569569
* S3 nodes).
570570
*/
571571
def mergeSchemasInParallel(
572+
parameters: Map[String, String],
572573
filesToTouch: Seq[FileStatus],
573574
sparkSession: SparkSession): Option[StructType] = {
574575
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
575576
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
576-
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
577+
val serializedConf = new SerializableConfiguration(
578+
sparkSession.sessionState.newHadoopConfWithOptions(parameters))
577579

578580
// !! HACK ALERT !!
579581
//

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package org.apache.spark.sql
1919

2020
import java.io.{File, FileNotFoundException}
21+
import java.net.URI
2122
import java.util.Locale
2223

2324
import scala.collection.mutable
2425

25-
import org.apache.hadoop.fs.Path
26+
import org.apache.hadoop.conf.Configuration
27+
import org.apache.hadoop.fs.{LocalFileSystem, Path}
2628
import org.scalatest.BeforeAndAfterAll
2729

2830
import org.apache.spark.SparkException
@@ -479,18 +481,15 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
479481
}
480482

481483
test("SPARK-31935: Hadoop file system config should be effective in data source options") {
482-
withTempDir { dir =>
483-
val path = dir.getCanonicalPath
484-
val defaultFs = "nonexistFS://nonexistFS"
485-
val expectMessage = "No FileSystem for scheme: nonexistFS"
486-
val message1 = intercept[java.io.IOException] {
487-
spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
488-
}.getMessage
489-
assert(message1 == expectMessage)
490-
val message2 = intercept[java.io.IOException] {
491-
spark.read.option("fs.defaultFS", defaultFs).parquet(path)
492-
}.getMessage
493-
assert(message2 == expectMessage)
484+
withSQLConf(
485+
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
486+
"fs.file.impl.disable.cache" -> "true") {
487+
withTempDir { dir =>
488+
val path = "file:" + dir.getCanonicalPath.stripPrefix("file:")
489+
spark.range(10).write.option("ds_option", "value").mode("overwrite").parquet(path)
490+
checkAnswer(
491+
spark.read.option("ds_option", "value").parquet(path), spark.range(10).toDF())
492+
}
494493
}
495494
}
496495

@@ -543,3 +542,10 @@ object TestingUDT {
543542
override def userClass: Class[NullData] = classOf[NullData]
544543
}
545544
}
545+
546+
class FakeFileSystemRequiringDSOption extends LocalFileSystem {
547+
override def initialize(name: URI, conf: Configuration): Unit = {
548+
super.initialize(name, conf)
549+
require(conf.get("ds_option", "") == "value")
550+
}
551+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
6666
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
6767
OrcFileOperator.readSchema(
6868
files.map(_.getPath.toString),
69-
Some(sparkSession.sessionState.newHadoopConf()),
69+
Some(sparkSession.sessionState.newHadoopConfWithOptions(options)),
7070
ignoreCorruptFiles
7171
)
7272
}

0 commit comments

Comments
 (0)