Skip to content

Commit 6edb20d

Browse files
cloud-fangengliangwang
authored andcommitted
[SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options
### What changes were proposed in this pull request? This is a followup of #28760 to fix the remaining issues: 1. should consider data source options when refreshing cache by path at the end of `InsertIntoHadoopFsRelationCommand` 2. should consider data source options when inferring schema for file source 3. should consider data source options when getting the qualified path in file source v2. ### Why are the changes needed? We didn't catch these issues in #28760, because the test case is to check error when initializing the file system. If we initialize the file system multiple times during a simple read/write action, the test case actually only test the first time. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? rewrite the test to make sure the entire data source read/write action can succeed. Closes #28948 from cloud-fan/fix. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent f843a5b commit 6edb20d

File tree

16 files changed

+59
-45
lines changed

16 files changed

+59
-45
lines changed

external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ class AvroDataSourceV2 extends FileDataSourceV2 {
3131

3232
override def getTable(options: CaseInsensitiveStringMap): Table = {
3333
val paths = getPaths(options)
34-
val tableName = getTableName(paths)
34+
val tableName = getTableName(options, paths)
3535
AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
3636
}
3737

3838
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
3939
val paths = getPaths(options)
40-
val tableName = getTableName(paths)
40+
val tableName = getTableName(options, paths)
4141
AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
4242
}
4343
}

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,12 +248,17 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
248248
* `HadoopFsRelation` node(s) as part of its logical plan.
249249
*/
250250
def recacheByPath(spark: SparkSession, resourcePath: String): Unit = {
251-
val (fs, qualifiedPath) = {
252-
val path = new Path(resourcePath)
253-
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
254-
(fs, fs.makeQualified(path))
255-
}
251+
val path = new Path(resourcePath)
252+
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
253+
recacheByPath(spark, path, fs)
254+
}
256255

256+
/**
257+
* Tries to re-cache all the cache entries that contain `resourcePath` in one or more
258+
* `HadoopFsRelation` node(s) as part of its logical plan.
259+
*/
260+
def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): Unit = {
261+
val qualifiedPath = fs.makeQualified(resourcePath)
257262
recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
258263
}
259264

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ case class InsertIntoHadoopFsRelationCommand(
192192
// refresh cached files in FileIndex
193193
fileIndex.foreach(_.refresh())
194194
// refresh data cache if table is cached
195-
sparkSession.catalog.refreshByPath(outputPath.toString)
195+
sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs)
196196

197197
if (catalogTable.nonEmpty) {
198198
CommandUtils.updateTableStats(sparkSession, catalogTable.get)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@ object SchemaMergeUtils extends Logging {
3232
*/
3333
def mergeSchemasInParallel(
3434
sparkSession: SparkSession,
35+
parameters: Map[String, String],
3536
files: Seq[FileStatus],
3637
schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType])
3738
: Option[StructType] = {
38-
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
39+
val serializedConf = new SerializableConfiguration(
40+
sparkSession.sessionState.newHadoopConfWithOptions(parameters))
3941

4042
// !! HACK ALERT !!
4143
// Here is a hack for Parquet, but it can be used by Orc as well.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ object OrcUtils extends Logging {
109109
val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
110110
if (orcOptions.mergeSchema) {
111111
SchemaMergeUtils.mergeSchemasInParallel(
112-
sparkSession, files, OrcUtils.readOrcSchemasInParallel)
112+
sparkSession, options, files, OrcUtils.readOrcSchemasInParallel)
113113
} else {
114114
OrcUtils.readSchema(sparkSession, files)
115115
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ object ParquetFileFormat extends Logging {
475475
* S3 nodes).
476476
*/
477477
def mergeSchemasInParallel(
478+
parameters: Map[String, String],
478479
filesToTouch: Seq[FileStatus],
479480
sparkSession: SparkSession): Option[StructType] = {
480481
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
@@ -490,7 +491,7 @@ object ParquetFileFormat extends Logging {
490491
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
491492
}
492493

493-
SchemaMergeUtils.mergeSchemasInParallel(sparkSession, filesToTouch, reader)
494+
SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader)
494495
}
495496

496497
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ object ParquetUtils {
104104
.orElse(filesByType.data.headOption)
105105
.toSeq
106106
}
107-
ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
107+
ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
108108
}
109109

110110
case class FileTypes(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ package org.apache.spark.sql.execution.datasources.v2
1818

1919
import java.util
2020

21+
import scala.collection.JavaConverters._
22+
2123
import com.fasterxml.jackson.databind.ObjectMapper
24+
import org.apache.hadoop.conf.Configuration
2225
import org.apache.hadoop.fs.Path
2326

2427
import org.apache.spark.sql.SparkSession
@@ -53,14 +56,16 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
5356
paths ++ Option(map.get("path")).toSeq
5457
}
5558

56-
protected def getTableName(paths: Seq[String]): String = {
57-
val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",")
59+
protected def getTableName(map: CaseInsensitiveStringMap, paths: Seq[String]): String = {
60+
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(
61+
map.asCaseSensitiveMap().asScala.toMap)
62+
val name = shortName() + " " + paths.map(qualifiedPathName(_, hadoopConf)).mkString(",")
5863
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name)
5964
}
6065

61-
private def qualifiedPathName(path: String): String = {
66+
private def qualifiedPathName(path: String, hadoopConf: Configuration): String = {
6267
val hdfsPath = new Path(path)
63-
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
68+
val fs = hdfsPath.getFileSystem(hadoopConf)
6469
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
6570
}
6671

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ class CSVDataSourceV2 extends FileDataSourceV2 {
3131

3232
override def getTable(options: CaseInsensitiveStringMap): Table = {
3333
val paths = getPaths(options)
34-
val tableName = getTableName(paths)
34+
val tableName = getTableName(options, paths)
3535
CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
3636
}
3737

3838
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
3939
val paths = getPaths(options)
40-
val tableName = getTableName(paths)
40+
val tableName = getTableName(options, paths)
4141
CSVTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
4242
}
4343
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ class JsonDataSourceV2 extends FileDataSourceV2 {
3131

3232
override def getTable(options: CaseInsensitiveStringMap): Table = {
3333
val paths = getPaths(options)
34-
val tableName = getTableName(paths)
34+
val tableName = getTableName(options, paths)
3535
JsonTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
3636
}
3737

3838
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
3939
val paths = getPaths(options)
40-
val tableName = getTableName(paths)
40+
val tableName = getTableName(options, paths)
4141
JsonTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
4242
}
4343
}

0 commit comments

Comments
 (0)