Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,18 @@ class CacheManager extends Logging {
* Tries to re-cache all the cache entries that contain `resourcePath` in one or more
* `HadoopFsRelation` node(s) as part of its logical plan.
*/
def recacheByPath(spark: SparkSession, resourcePath: String): Unit = writeLock {
val (fs, qualifiedPath) = {
val path = new Path(resourcePath)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
(fs, fs.makeQualified(path))
}
def recacheByPath(spark: SparkSession, resourcePath: String): Unit = {
val path = new Path(resourcePath)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
recacheByPath(spark, path, fs)
}

/**
* Tries to re-cache all the cache entries that contain `resourcePath` in one or more
* `HadoopFsRelation` node(s) as part of its logical plan.
*/
def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): Unit = writeLock {
val qualifiedPath = fs.makeQualified(resourcePath)
recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ case class InsertIntoHadoopFsRelationCommand(
// refresh cached files in FileIndex
fileIndex.foreach(_.refresh())
// refresh data cache if table is cached
sparkSession.catalog.refreshByPath(outputPath.toString)
sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs)

if (catalogTable.nonEmpty) {
CommandUtils.updateTableStats(sparkSession, catalogTable.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class ParquetFileFormat
.orElse(filesByType.data.headOption)
.toSeq
}
ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
}

case class FileTypes(
Expand Down Expand Up @@ -569,11 +569,13 @@ object ParquetFileFormat extends Logging {
* S3 nodes).
*/
def mergeSchemasInParallel(
parameters: Map[String, String],
filesToTouch: Seq[FileStatus],
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
val serializedConf = new SerializableConfiguration(
sparkSession.sessionState.newHadoopConfWithOptions(parameters))

// !! HACK ALERT !!
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.spark.sql

import java.io.{File, FileNotFoundException}
import java.net.URI
import java.util.Locale

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocalFileSystem, Path}
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkException
Expand Down Expand Up @@ -479,18 +481,15 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}

test("SPARK-31935: Hadoop file system config should be effective in data source options") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val defaultFs = "nonexistFS://nonexistFS"
val expectMessage = "No FileSystem for scheme: nonexistFS"
val message1 = intercept[java.io.IOException] {
spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
}.getMessage
assert(message1 == expectMessage)
val message2 = intercept[java.io.IOException] {
spark.read.option("fs.defaultFS", defaultFs).parquet(path)
}.getMessage
assert(message2 == expectMessage)
withSQLConf(
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
"fs.file.impl.disable.cache" -> "true") {
withTempDir { dir =>
val path = "file:" + dir.getCanonicalPath.stripPrefix("file:")
spark.range(10).write.option("ds_option", "value").mode("overwrite").parquet(path)
checkAnswer(
spark.read.option("ds_option", "value").parquet(path), spark.range(10).toDF())
}
}
}

Expand Down Expand Up @@ -543,3 +542,10 @@ object TestingUDT {
override def userClass: Class[NullData] = classOf[NullData]
}
}

class FakeFileSystemRequiringDSOption extends LocalFileSystem {
override def initialize(name: URI, conf: Configuration): Unit = {
super.initialize(name, conf)
require(conf.get("ds_option", "") == "value")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
OrcFileOperator.readSchema(
files.map(_.getPath.toString),
Some(sparkSession.sessionState.newHadoopConf()),
Some(sparkSession.sessionState.newHadoopConfWithOptions(options)),
ignoreCorruptFiles
)
}
Expand Down