Skip to content

Commit 1fe95be

Browse files
committed
[SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options
This is a followup of apache#28760 to fix the remaining issues: 1. should consider data source options when refreshing cache by path at the end of `InsertIntoHadoopFsRelationCommand` 2. should consider data source options when inferring schema for file source 3. should consider data source options when getting the qualified path in file source v2. We didn't catch these issues in apache#28760, because the test case is to check error when initializing the file system. If we initialize the file system multiple times during a simple read/write action, the test case actually only test the first time. No rewrite the test to make sure the entire data source read/write action can succeed. Closes apache#28948 from cloud-fan/fix. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Gengliang Wang <[email protected]> (cherry picked from commit 6edb20d) Signed-off-by: Gengliang Wang <[email protected]>
1 parent bc1acfe commit 1fe95be

File tree

5 files changed

+36
-23
lines changed

5 files changed

+36
-23
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,18 @@ class CacheManager extends Logging {
237237
* Tries to re-cache all the cache entries that contain `resourcePath` in one or more
238238
* `HadoopFsRelation` node(s) as part of its logical plan.
239239
*/
240-
def recacheByPath(spark: SparkSession, resourcePath: String): Unit = writeLock {
241-
val (fs, qualifiedPath) = {
242-
val path = new Path(resourcePath)
243-
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
244-
(fs, fs.makeQualified(path))
245-
}
240+
def recacheByPath(spark: SparkSession, resourcePath: String): Unit = {
241+
val path = new Path(resourcePath)
242+
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
243+
recacheByPath(spark, path, fs)
244+
}
246245

246+
/**
247+
* Tries to re-cache all the cache entries that contain `resourcePath` in one or more
248+
* `HadoopFsRelation` node(s) as part of its logical plan.
249+
*/
250+
def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): Unit = writeLock {
251+
val qualifiedPath = fs.makeQualified(resourcePath)
247252
recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
248253
}
249254

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ case class InsertIntoHadoopFsRelationCommand(
184184
// refresh cached files in FileIndex
185185
fileIndex.foreach(_.refresh())
186186
// refresh data cache if table is cached
187-
sparkSession.catalog.refreshByPath(outputPath.toString)
187+
sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs)
188188

189189
if (catalogTable.nonEmpty) {
190190
CommandUtils.updateTableStats(sparkSession, catalogTable.get)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ class ParquetFileFormat
238238
.orElse(filesByType.data.headOption)
239239
.toSeq
240240
}
241-
ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
241+
ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
242242
}
243243

244244
case class FileTypes(
@@ -569,11 +569,13 @@ object ParquetFileFormat extends Logging {
569569
* S3 nodes).
570570
*/
571571
def mergeSchemasInParallel(
572+
parameters: Map[String, String],
572573
filesToTouch: Seq[FileStatus],
573574
sparkSession: SparkSession): Option[StructType] = {
574575
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
575576
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
576-
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
577+
val serializedConf = new SerializableConfiguration(
578+
sparkSession.sessionState.newHadoopConfWithOptions(parameters))
577579

578580
// !! HACK ALERT !!
579581
//

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package org.apache.spark.sql
1919

2020
import java.io.{File, FileNotFoundException}
21+
import java.net.URI
2122
import java.util.Locale
2223

2324
import scala.collection.mutable
2425

25-
import org.apache.hadoop.fs.Path
26+
import org.apache.hadoop.conf.Configuration
27+
import org.apache.hadoop.fs.{LocalFileSystem, Path}
2628
import org.scalatest.BeforeAndAfterAll
2729

2830
import org.apache.spark.SparkException
@@ -479,18 +481,15 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
479481
}
480482

481483
test("SPARK-31935: Hadoop file system config should be effective in data source options") {
482-
withTempDir { dir =>
483-
val path = dir.getCanonicalPath
484-
val defaultFs = "nonexistFS://nonexistFS"
485-
val expectMessage = "No FileSystem for scheme: nonexistFS"
486-
val message1 = intercept[java.io.IOException] {
487-
spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
488-
}.getMessage
489-
assert(message1 == expectMessage)
490-
val message2 = intercept[java.io.IOException] {
491-
spark.read.option("fs.defaultFS", defaultFs).parquet(path)
492-
}.getMessage
493-
assert(message2 == expectMessage)
484+
withSQLConf(
485+
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
486+
"fs.file.impl.disable.cache" -> "true") {
487+
withTempDir { dir =>
488+
val path = "file:" + dir.getCanonicalPath.stripPrefix("file:")
489+
spark.range(10).write.option("ds_option", "value").mode("overwrite").parquet(path)
490+
checkAnswer(
491+
spark.read.option("ds_option", "value").parquet(path), spark.range(10).toDF())
492+
}
494493
}
495494
}
496495

@@ -543,3 +542,10 @@ object TestingUDT {
543542
override def userClass: Class[NullData] = classOf[NullData]
544543
}
545544
}
545+
546+
class FakeFileSystemRequiringDSOption extends LocalFileSystem {
547+
override def initialize(name: URI, conf: Configuration): Unit = {
548+
super.initialize(name, conf)
549+
require(conf.get("ds_option", "") == "value")
550+
}
551+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
6666
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
6767
OrcFileOperator.readSchema(
6868
files.map(_.getPath.toString),
69-
Some(sparkSession.sessionState.newHadoopConf()),
69+
Some(sparkSession.sessionState.newHadoopConfWithOptions(options)),
7070
ignoreCorruptFiles
7171
)
7272
}

0 commit comments

Comments
 (0)