Skip to content

Commit 3356b8b

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-19092][SQL] Save() API of DataFrameWriter should not scan all the saved files
### What changes were proposed in this pull request? `DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. The related PR: #16090 ### How was this patch tested? Updated the existing test cases. Author: gatorsmile <[email protected]> Closes #16481 from gatorsmile/saveFileScan.
1 parent c983267 commit 3356b8b

File tree

3 files changed

+106
-97
lines changed

3 files changed

+106
-97
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ case class CreateDataSourceTableAsSelectCommand(
199199
catalogTable = if (tableExists) Some(table) else None)
200200

201201
try {
202-
dataSource.write(mode, Dataset.ofRows(session, query))
202+
dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
203203
} catch {
204204
case ex: AnalysisException =>
205205
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 98 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -413,10 +413,85 @@ case class DataSource(
413413
relation
414414
}
415415

416-
/** Writes the given [[DataFrame]] out to this [[DataSource]]. */
417-
def write(
418-
mode: SaveMode,
419-
data: DataFrame): BaseRelation = {
416+
/**
417+
* Writes the given [[DataFrame]] out in this [[FileFormat]].
418+
*/
419+
private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
420+
// Don't glob path for the write path. The contracts here are:
421+
// 1. Only one output path can be specified on the write path;
422+
// 2. Output path must be a legal HDFS style file system path;
423+
// 3. It's OK that the output path doesn't exist yet;
424+
val allPaths = paths ++ caseInsensitiveOptions.get("path")
425+
val outputPath = if (allPaths.length == 1) {
426+
val path = new Path(allPaths.head)
427+
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
428+
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
429+
} else {
430+
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
431+
s"got: ${allPaths.mkString(", ")}")
432+
}
433+
434+
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
435+
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
436+
437+
// If we are appending to a table that already exists, make sure the partitioning matches
438+
// up. If we fail to load the table for whatever reason, ignore the check.
439+
if (mode == SaveMode.Append) {
440+
val existingPartitionColumns = Try {
441+
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
442+
}.getOrElse(Seq.empty[String])
443+
// TODO: Case sensitivity.
444+
val sameColumns =
445+
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
446+
if (existingPartitionColumns.nonEmpty && !sameColumns) {
447+
throw new AnalysisException(
448+
s"""Requested partitioning does not match existing partitioning.
449+
|Existing partitioning columns:
450+
| ${existingPartitionColumns.mkString(", ")}
451+
|Requested partitioning columns:
452+
| ${partitionColumns.mkString(", ")}
453+
|""".stripMargin)
454+
}
455+
}
456+
457+
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
458+
// not need to have the query as child, to avoid to analyze an optimized query,
459+
// because InsertIntoHadoopFsRelationCommand will be optimized first.
460+
val partitionAttributes = partitionColumns.map { name =>
461+
val plan = data.logicalPlan
462+
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
463+
throw new AnalysisException(
464+
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
465+
}.asInstanceOf[Attribute]
466+
}
467+
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
468+
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
469+
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
470+
}.head
471+
}
472+
// For partitioned relation r, r.schema's column ordering can be different from the column
473+
// ordering of data.logicalPlan (partition columns are all moved after data column). This
474+
// will be adjusted within InsertIntoHadoopFsRelation.
475+
val plan =
476+
InsertIntoHadoopFsRelationCommand(
477+
outputPath = outputPath,
478+
staticPartitions = Map.empty,
479+
partitionColumns = partitionAttributes,
480+
bucketSpec = bucketSpec,
481+
fileFormat = format,
482+
options = options,
483+
query = data.logicalPlan,
484+
mode = mode,
485+
catalogTable = catalogTable,
486+
fileIndex = fileIndex)
487+
sparkSession.sessionState.executePlan(plan).toRdd
488+
}
489+
490+
/**
491+
* Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for
492+
* the following reading.
493+
*/
494+
def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
420495
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
421496
throw new AnalysisException("Cannot save interval data type into external storage.")
422497
}
@@ -425,78 +500,27 @@ case class DataSource(
425500
case dataSource: CreatableRelationProvider =>
426501
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
427502
case format: FileFormat =>
428-
// Don't glob path for the write path. The contracts here are:
429-
// 1. Only one output path can be specified on the write path;
430-
// 2. Output path must be a legal HDFS style file system path;
431-
// 3. It's OK that the output path doesn't exist yet;
432-
val allPaths = paths ++ caseInsensitiveOptions.get("path")
433-
val outputPath = if (allPaths.length == 1) {
434-
val path = new Path(allPaths.head)
435-
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
436-
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
437-
} else {
438-
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
439-
s"got: ${allPaths.mkString(", ")}")
440-
}
441-
442-
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
443-
PartitioningUtils.validatePartitionColumn(
444-
data.schema, partitionColumns, caseSensitive)
445-
446-
// If we are appending to a table that already exists, make sure the partitioning matches
447-
// up. If we fail to load the table for whatever reason, ignore the check.
448-
if (mode == SaveMode.Append) {
449-
val existingPartitionColumns = Try {
450-
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
451-
}.getOrElse(Seq.empty[String])
452-
// TODO: Case sensitivity.
453-
val sameColumns =
454-
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
455-
if (existingPartitionColumns.nonEmpty && !sameColumns) {
456-
throw new AnalysisException(
457-
s"""Requested partitioning does not match existing partitioning.
458-
|Existing partitioning columns:
459-
| ${existingPartitionColumns.mkString(", ")}
460-
|Requested partitioning columns:
461-
| ${partitionColumns.mkString(", ")}
462-
|""".stripMargin)
463-
}
464-
}
465-
466-
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
467-
// not need to have the query as child, to avoid to analyze an optimized query,
468-
// because InsertIntoHadoopFsRelationCommand will be optimized first.
469-
val partitionAttributes = partitionColumns.map { name =>
470-
val plan = data.logicalPlan
471-
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
472-
throw new AnalysisException(
473-
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
474-
}.asInstanceOf[Attribute]
475-
}
476-
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
477-
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
478-
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
479-
}.head
480-
}
481-
// For partitioned relation r, r.schema's column ordering can be different from the column
482-
// ordering of data.logicalPlan (partition columns are all moved after data column). This
483-
// will be adjusted within InsertIntoHadoopFsRelation.
484-
val plan =
485-
InsertIntoHadoopFsRelationCommand(
486-
outputPath = outputPath,
487-
staticPartitions = Map.empty,
488-
partitionColumns = partitionAttributes,
489-
bucketSpec = bucketSpec,
490-
fileFormat = format,
491-
options = options,
492-
query = data.logicalPlan,
493-
mode = mode,
494-
catalogTable = catalogTable,
495-
fileIndex = fileIndex)
496-
sparkSession.sessionState.executePlan(plan).toRdd
497-
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
503+
writeInFileFormat(format, mode, data)
504+
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
498505
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
506+
case _ =>
507+
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
508+
}
509+
}
499510

511+
/**
512+
* Writes the given [[DataFrame]] out to this [[DataSource]].
513+
*/
514+
def write(mode: SaveMode, data: DataFrame): Unit = {
515+
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
516+
throw new AnalysisException("Cannot save interval data type into external storage.")
517+
}
518+
519+
providingClass.newInstance() match {
520+
case dataSource: CreatableRelationProvider =>
521+
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
522+
case format: FileFormat =>
523+
writeInFileFormat(format, mode, data)
500524
case _ =>
501525
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
502526
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,12 @@ class PartitionedTablePerfStatsSuite
6262
}
6363

6464
private def setupPartitionedHiveTable(
65-
tableName: String, dir: File, scale: Int,
66-
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
65+
tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
6766
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
6867
.partitionBy("partCol1", "partCol2")
6968
.mode("overwrite")
7069
.parquet(dir.getAbsolutePath)
7170

72-
if (clearMetricsBeforeCreate) {
73-
HiveCatalogMetrics.reset()
74-
}
75-
7671
spark.sql(s"""
7772
|create external table $tableName (fieldOne long)
7873
|partitioned by (partCol1 int, partCol2 int)
@@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite
8883
}
8984

9085
private def setupPartitionedDatasourceTable(
91-
tableName: String, dir: File, scale: Int,
92-
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
86+
tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
9387
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
9488
.partitionBy("partCol1", "partCol2")
9589
.mode("overwrite")
9690
.parquet(dir.getAbsolutePath)
9791

98-
if (clearMetricsBeforeCreate) {
99-
HiveCatalogMetrics.reset()
100-
}
101-
10292
spark.sql(s"""
10393
|create table $tableName (fieldOne long, partCol1 int, partCol2 int)
10494
|using parquet
@@ -271,8 +261,8 @@ class PartitionedTablePerfStatsSuite
271261
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
272262
withTable("test") {
273263
withTempDir { dir =>
274-
setupPartitionedDatasourceTable(
275-
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
264+
HiveCatalogMetrics.reset()
265+
setupPartitionedDatasourceTable("test", dir, scale = 10, repair = false)
276266
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
277267
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
278268
}
@@ -285,8 +275,7 @@ class PartitionedTablePerfStatsSuite
285275
withTable("test") {
286276
withTempDir { dir =>
287277
HiveCatalogMetrics.reset()
288-
setupPartitionedHiveTable(
289-
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
278+
setupPartitionedHiveTable("test", dir, scale = 10, repair = false)
290279
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
291280
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
292281
}
@@ -416,12 +405,8 @@ class PartitionedTablePerfStatsSuite
416405
})
417406
executorPool.shutdown()
418407
executorPool.awaitTermination(30, TimeUnit.SECONDS)
419-
// check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and
420-
// METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect,
421-
// only one thread can really do the build, so the listing job count is 2, the other
422-
// one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2
423-
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
424-
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
408+
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50)
409+
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
425410
}
426411
}
427412
}

0 commit comments

Comments
 (0)