Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class AvroDataSourceV2 extends FileDataSourceV2 {

override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,17 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
* `HadoopFsRelation` node(s) as part of its logical plan.
*/
def recacheByPath(spark: SparkSession, resourcePath: String): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we change the method as

def recacheByPath(spark: SparkSession, resourcePath: String, options: Map[String, String]=Map.empty)

so that we can avoid the new method below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That also works, but it feels a little bit weird to couple the data source options concept with the cache manager...

val (fs, qualifiedPath) = {
val path = new Path(resourcePath)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
(fs, fs.makeQualified(path))
}
val path = new Path(resourcePath)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
recacheByPath(spark, path, fs)
}

/**
* Tries to re-cache all the cache entries that contain `resourcePath` in one or more
* `HadoopFsRelation` node(s) as part of its logical plan.
*/
def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): Unit = {
val qualifiedPath = fs.makeQualified(resourcePath)
recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ case class InsertIntoHadoopFsRelationCommand(
// refresh cached files in FileIndex
fileIndex.foreach(_.refresh())
// refresh data cache if table is cached
sparkSession.catalog.refreshByPath(outputPath.toString)
sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs)

if (catalogTable.nonEmpty) {
CommandUtils.updateTableStats(sparkSession, catalogTable.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ object SchemaMergeUtils extends Logging {
*/
def mergeSchemasInParallel(
sparkSession: SparkSession,
parameters: Map[String, String],
files: Seq[FileStatus],
schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType])
: Option[StructType] = {
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
val serializedConf = new SerializableConfiguration(
sparkSession.sessionState.newHadoopConfWithOptions(parameters))

// !! HACK ALERT !!
// Here is a hack for Parquet, but it can be used by Orc as well.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object OrcUtils extends Logging {
val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
if (orcOptions.mergeSchema) {
SchemaMergeUtils.mergeSchemasInParallel(
sparkSession, files, OrcUtils.readOrcSchemasInParallel)
sparkSession, options, files, OrcUtils.readOrcSchemasInParallel)
} else {
OrcUtils.readSchema(sparkSession, files)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ object ParquetFileFormat extends Logging {
* S3 nodes).
*/
def mergeSchemasInParallel(
parameters: Map[String, String],
filesToTouch: Seq[FileStatus],
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
Expand All @@ -490,7 +491,7 @@ object ParquetFileFormat extends Logging {
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
}

SchemaMergeUtils.mergeSchemasInParallel(sparkSession, filesToTouch, reader)
SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ object ParquetUtils {
.orElse(filesByType.data.headOption)
.toSeq
}
ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
}

case class FileTypes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package org.apache.spark.sql.execution.datasources.v2

import java.util

import scala.collection.JavaConverters._

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -53,14 +56,16 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
paths ++ Option(map.get("path")).toSeq
}

protected def getTableName(paths: Seq[String]): String = {
val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",")
protected def getTableName(map: CaseInsensitiveStringMap, paths: Seq[String]): String = {
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(
map.asCaseSensitiveMap().asScala.toMap)
val name = shortName() + " " + paths.map(qualifiedPathName(_, hadoopConf)).mkString(",")
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name)
}

private def qualifiedPathName(path: String): String = {
private def qualifiedPathName(path: String, hadoopConf: Configuration): String = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val fs = hdfsPath.getFileSystem(hadoopConf)
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class CSVDataSourceV2 extends FileDataSourceV2 {

override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
CSVTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class JsonDataSourceV2 extends FileDataSourceV2 {

override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
JsonTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
JsonTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class OrcDataSourceV2 extends FileDataSourceV2 {

override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
OrcTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
OrcTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class ParquetDataSourceV2 extends FileDataSourceV2 {

override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
ParquetTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
ParquetTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class TextDataSourceV2 extends FileDataSourceV2 {

override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
TextTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
val tableName = getTableName(options, paths)
TextTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.sql

import java.io.{File, FileNotFoundException}
import java.net.URI
import java.nio.file.{Files, StandardOpenOption}
import java.util.Locale

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocalFileSystem, Path}

import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
Expand Down Expand Up @@ -845,19 +847,15 @@ class FileBasedDataSourceSuite extends QueryTest

test("SPARK-31935: Hadoop file system config should be effective in data source options") {
Seq("parquet", "").foreach { format =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> format,
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
"fs.file.impl.disable.cache" -> "true") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val defaultFs = "nonexistFS://nonexistFS"
val expectMessage = "No FileSystem for scheme nonexistFS"
val message1 = intercept[java.io.IOException] {
spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
}.getMessage
assert(message1.filterNot(Set(':', '"').contains) == expectMessage)
val message2 = intercept[java.io.IOException] {
spark.read.option("fs.defaultFS", defaultFs).parquet(path)
}.getMessage
assert(message2.filterNot(Set(':', '"').contains) == expectMessage)
val path = "file:" + dir.getCanonicalPath.stripPrefix("file:")
spark.range(10).write.option("ds_option", "value").mode("overwrite").parquet(path)
checkAnswer(
spark.read.option("ds_option", "value").parquet(path), spark.range(10).toDF())
}
}
}
Expand Down Expand Up @@ -932,3 +930,10 @@ object TestingUDT {
override def userClass: Class[NullData] = classOf[NullData]
}
}

class FakeFileSystemRequiringDSOption extends LocalFileSystem {
override def initialize(name: URI, conf: Configuration): Unit = {
super.initialize(name, conf)
require(conf.get("ds_option", "") == "value")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice check!

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten

val schema = SchemaMergeUtils.mergeSchemasInParallel(
spark,
fileStatuses,
schemaReader)
spark, Map.empty, fileStatuses, schemaReader)

assert(schema.isDefined)
assert(schema.get == StructType(Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
if (orcOptions.mergeSchema) {
SchemaMergeUtils.mergeSchemasInParallel(
sparkSession,
files,
OrcFileOperator.readOrcSchemasInParallel)
sparkSession, options, files, OrcFileOperator.readOrcSchemasInParallel)
} else {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
OrcFileOperator.readSchema(
Expand Down