Skip to content

Commit c71b254

Browse files
ericlcloud-fan
authored andcommitted
[SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API
## What changes were proposed in this pull request? Currently in SQL we implement overwrites by calling fs.delete() directly on the original data. This is not ideal since we the original files end up deleted even if the job aborts. We should extend the commit protocol to allow file overwrites to be managed as well. ## How was this patch tested? Existing tests. I also fixed a bunch of tests that were depending on the commit protocol implementation being set to the legacy mapreduce one. cc rxin cloud-fan Author: Eric Liang <[email protected]> Author: Eric Liang <[email protected]> Closes #16554 from ericl/add-delete-protocol.
1 parent 5db35b3 commit c71b254

File tree

9 files changed

+149
-110
lines changed

9 files changed

+149
-110
lines changed

core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.internal.io
1919

20+
import org.apache.hadoop.fs._
2021
import org.apache.hadoop.mapreduce._
2122

2223
import org.apache.spark.util.Utils
@@ -112,6 +113,14 @@ abstract class FileCommitProtocol {
112113
* just crashes (or killed) before it can call abort.
113114
*/
114115
def abortTask(taskContext: TaskAttemptContext): Unit
116+
117+
/**
118+
* Specifies that a file should be deleted with the commit of this job. The default
119+
* implementation deletes the file immediately.
120+
*/
121+
def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = {
122+
fs.delete(path, recursive)
123+
}
115124
}
116125

117126

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,20 @@ case class InsertIntoHadoopFsRelationCommand(
8888
}
8989

9090
val pathExists = fs.exists(qualifiedOutputPath)
91+
// If we are appending data to an existing dir.
92+
val isAppend = pathExists && (mode == SaveMode.Append)
93+
94+
val committer = FileCommitProtocol.instantiate(
95+
sparkSession.sessionState.conf.fileCommitProtocolClass,
96+
jobId = java.util.UUID.randomUUID().toString,
97+
outputPath = outputPath.toString,
98+
isAppend = isAppend)
99+
91100
val doInsertion = (mode, pathExists) match {
92101
case (SaveMode.ErrorIfExists, true) =>
93102
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
94103
case (SaveMode.Overwrite, true) =>
95-
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations)
104+
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
96105
true
97106
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
98107
true
@@ -101,15 +110,8 @@ case class InsertIntoHadoopFsRelationCommand(
101110
case (s, exists) =>
102111
throw new IllegalStateException(s"unsupported save mode $s ($exists)")
103112
}
104-
// If we are appending data to an existing dir.
105-
val isAppend = pathExists && (mode == SaveMode.Append)
106113

107114
if (doInsertion) {
108-
val committer = FileCommitProtocol.instantiate(
109-
sparkSession.sessionState.conf.fileCommitProtocolClass,
110-
jobId = java.util.UUID.randomUUID().toString,
111-
outputPath = outputPath.toString,
112-
isAppend = isAppend)
113115

114116
// Callback for updating metastore partition metadata after the insertion job completes.
115117
def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
@@ -160,7 +162,8 @@ case class InsertIntoHadoopFsRelationCommand(
160162
private def deleteMatchingPartitions(
161163
fs: FileSystem,
162164
qualifiedOutputPath: Path,
163-
customPartitionLocations: Map[TablePartitionSpec, String]): Unit = {
165+
customPartitionLocations: Map[TablePartitionSpec, String],
166+
committer: FileCommitProtocol): Unit = {
164167
val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
165168
"/" + partitionColumns.flatMap { p =>
166169
staticPartitions.get(p.name) match {
@@ -175,7 +178,7 @@ case class InsertIntoHadoopFsRelationCommand(
175178
}
176179
// first clear the path determined by the static partition keys (e.g. /table/foo=1)
177180
val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
178-
if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* recursively */)) {
181+
if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) {
179182
throw new IOException(s"Unable to clear output " +
180183
s"directory $staticPrefixPath prior to writing to it")
181184
}
@@ -185,7 +188,7 @@ case class InsertIntoHadoopFsRelationCommand(
185188
(staticPartitions.toSet -- spec).isEmpty,
186189
"Custom partition location did not match static partitioning keys")
187190
val path = new Path(customLoc)
188-
if (fs.exists(path) && !fs.delete(path, true)) {
191+
if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
189192
throw new IOException(s"Unable to clear partition " +
190193
s"directory $path prior to writing to it")
191194
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
3131
// ignore hidden files
3232
val allFiles = dir.listFiles(new FilenameFilter {
3333
override def accept(dir: File, name: String): Boolean = {
34-
!name.startsWith(".")
34+
!name.startsWith(".") && !name.startsWith("_")
3535
}
3636
})
3737
val totalSize = allFiles.map(_.length()).sum

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 66 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.sql._
4040
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
4141
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
4242
import org.apache.spark.sql.catalyst.util.DateTimeUtils
43+
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
4344
import org.apache.spark.sql.internal.SQLConf
4445
import org.apache.spark.sql.test.SharedSQLContext
4546
import org.apache.spark.sql.types._
@@ -462,16 +463,19 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
462463
}
463464

464465
test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
465-
val extraOptions = Map(
466-
SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName,
467-
"spark.sql.parquet.output.committer.class" ->
468-
classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName
469-
)
470-
withTempPath { dir =>
471-
val message = intercept[SparkException] {
472-
spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath)
473-
}.getCause.getMessage
474-
assert(message === "Intentional exception for testing purposes")
466+
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
467+
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
468+
val extraOptions = Map(
469+
SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName,
470+
"spark.sql.parquet.output.committer.class" ->
471+
classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName
472+
)
473+
withTempPath { dir =>
474+
val message = intercept[SparkException] {
475+
spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath)
476+
}.getCause.getMessage
477+
assert(message === "Intentional exception for testing purposes")
478+
}
475479
}
476480
}
477481

@@ -488,58 +492,64 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
488492
}
489493

490494
test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
491-
// Using a output committer that always fail when committing a task, so that both
492-
// `commitTask()` and `abortTask()` are invoked.
493-
val extraOptions = Map[String, String](
494-
"spark.sql.parquet.output.committer.class" ->
495-
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
496-
)
495+
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
496+
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
497+
// Using a output committer that always fail when committing a task, so that both
498+
// `commitTask()` and `abortTask()` are invoked.
499+
val extraOptions = Map[String, String](
500+
"spark.sql.parquet.output.committer.class" ->
501+
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
502+
)
503+
504+
// Before fixing SPARK-7837, the following code results in an NPE because both
505+
// `commitTask()` and `abortTask()` try to close output writers.
497506

498-
// Before fixing SPARK-7837, the following code results in an NPE because both
499-
// `commitTask()` and `abortTask()` try to close output writers.
500-
501-
withTempPath { dir =>
502-
val m1 = intercept[SparkException] {
503-
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
504-
}.getCause.getMessage
505-
assert(m1.contains("Intentional exception for testing purposes"))
506-
}
507+
withTempPath { dir =>
508+
val m1 = intercept[SparkException] {
509+
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
510+
}.getCause.getMessage
511+
assert(m1.contains("Intentional exception for testing purposes"))
512+
}
507513

508-
withTempPath { dir =>
509-
val m2 = intercept[SparkException] {
510-
val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1)
511-
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
512-
}.getCause.getMessage
513-
assert(m2.contains("Intentional exception for testing purposes"))
514+
withTempPath { dir =>
515+
val m2 = intercept[SparkException] {
516+
val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1)
517+
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
518+
}.getCause.getMessage
519+
assert(m2.contains("Intentional exception for testing purposes"))
520+
}
514521
}
515522
}
516523

517524
test("SPARK-11044 Parquet writer version fixed as version1 ") {
518-
// For dictionary encoding, Parquet changes the encoding types according to its writer
519-
// version. So, this test checks one of the encoding types in order to ensure that
520-
// the file is written with writer version2.
521-
val extraOptions = Map[String, String](
522-
// Write a Parquet file with writer version2.
523-
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString,
524-
// By default, dictionary encoding is enabled from Parquet 1.2.0 but
525-
// it is enabled just in case.
526-
ParquetOutputFormat.ENABLE_DICTIONARY -> "true"
527-
)
528-
529-
val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions)
530-
531-
withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
532-
withTempPath { dir =>
533-
val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
534-
spark.range(1 << 16).selectExpr("(id % 4) AS i")
535-
.coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path)
536-
537-
val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head
538-
val columnChunkMetadata = blockMetadata.getColumns.asScala.head
539-
540-
// If the file is written with version2, this should include
541-
// Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
542-
assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
525+
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
526+
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
527+
// For dictionary encoding, Parquet changes the encoding types according to its writer
528+
// version. So, this test checks one of the encoding types in order to ensure that
529+
// the file is written with writer version2.
530+
val extraOptions = Map[String, String](
531+
// Write a Parquet file with writer version2.
532+
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString,
533+
// By default, dictionary encoding is enabled from Parquet 1.2.0 but
534+
// it is enabled just in case.
535+
ParquetOutputFormat.ENABLE_DICTIONARY -> "true"
536+
)
537+
538+
val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions)
539+
540+
withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
541+
withTempPath { dir =>
542+
val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
543+
spark.range(1 << 16).selectExpr("(id % 4) AS i")
544+
.coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path)
545+
546+
val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head
547+
val columnChunkMetadata = blockMetadata.getColumns.asScala.head
548+
549+
// If the file is written with version2, this should include
550+
// Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
551+
assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
552+
}
543553
}
544554
}
545555
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -455,15 +455,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
455455
assert(partDf.schema.map(_.name) === Seq("intField", "stringField"))
456456

457457
path.listFiles().foreach { f =>
458-
if (f.getName.toLowerCase().endsWith(".parquet")) {
458+
if (!f.getName.startsWith("_") && f.getName.toLowerCase().endsWith(".parquet")) {
459459
// when the input is a path to a parquet file
460460
val df = spark.read.parquet(f.getCanonicalPath)
461461
assert(df.schema.map(_.name) === Seq("intField", "stringField"))
462462
}
463463
}
464464

465465
path.listFiles().foreach { f =>
466-
if (f.getName.toLowerCase().endsWith(".parquet")) {
466+
if (!f.getName.startsWith("_") && f.getName.toLowerCase().endsWith(".parquet")) {
467467
// when the input is a path to a parquet file but `basePath` is overridden to
468468
// the base path containing partitioning directories
469469
val df = spark
@@ -932,7 +932,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
932932
withTempPath { dir =>
933933
val path = dir.getCanonicalPath
934934

935-
withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
935+
withSQLConf(
936+
ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true",
937+
"spark.sql.sources.commitProtocolClass" ->
938+
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
936939
spark.range(3).write.parquet(s"$path/p0=0/p1=0")
937940
}
938941

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql._
2626
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2727
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
2828
import org.apache.spark.sql.execution.FileSourceScanExec
29+
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
2930
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
3031
import org.apache.spark.sql.internal.SQLConf
3132
import org.apache.spark.sql.test.SharedSQLContext
@@ -178,6 +179,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
178179
}
179180

180181
withSQLConf(
182+
SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
183+
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName,
181184
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
182185
SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true",
183186
ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true"
@@ -186,6 +189,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
186189
}
187190

188191
withSQLConf(
192+
SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
193+
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName,
189194
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
190195
SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false"
191196
) {

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
9393
.orc(path)
9494

9595
// Check if this is compressed as ZLIB.
96-
val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
96+
val maybeOrcFile = new File(path).listFiles().find { f =>
97+
!f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc")
98+
}
9799
assert(maybeOrcFile.isDefined)
98100
val orcFilePath = maybeOrcFile.get.toPath.toString
99101
val expectedCompressionKind =

0 commit comments

Comments
 (0)