Skip to content

Commit 7e53ff2

Browse files
committed
[SPARK-8578] [SQL] Should ignore user defined output committer when appending data (branch 1.4)
This is apache#6964 for branch 1.4. Author: Yin Huai <[email protected]> Closes apache#6966 from yhuai/SPARK-8578-branch-1.4 and squashes the following commits: 9c3947b [Yin Huai] Do not use a custom output commiter when appendiing data.
1 parent eafbe13 commit 7e53ff2

File tree

2 files changed

+136
-36
lines changed

2 files changed

+136
-36
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 54 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
9797
val fs = outputPath.getFileSystem(hadoopConf)
9898
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
9999

100-
val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
100+
val pathExists = fs.exists(qualifiedOutputPath)
101+
val doInsertion = (mode, pathExists) match {
101102
case (SaveMode.ErrorIfExists, true) =>
102103
sys.error(s"path $qualifiedOutputPath already exists.")
103104
case (SaveMode.Overwrite, true) =>
@@ -108,6 +109,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
108109
case (SaveMode.Ignore, exists) =>
109110
!exists
110111
}
112+
// If we are appending data to an existing dir.
113+
val isAppend = (pathExists) && (mode == SaveMode.Append)
111114

112115
if (doInsertion) {
113116
val job = new Job(hadoopConf)
@@ -133,10 +136,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
133136

134137
val partitionColumns = relation.partitionColumns.fieldNames
135138
if (partitionColumns.isEmpty) {
136-
insert(new DefaultWriterContainer(relation, job), df)
139+
insert(new DefaultWriterContainer(relation, job, isAppend), df)
137140
} else {
138141
val writerContainer = new DynamicPartitionWriterContainer(
139-
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
142+
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
140143
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
141144
}
142145
}
@@ -286,7 +289,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
286289

287290
private[sql] abstract class BaseWriterContainer(
288291
@transient val relation: HadoopFsRelation,
289-
@transient job: Job)
292+
@transient job: Job,
293+
isAppend: Boolean)
290294
extends SparkHadoopMapReduceUtil
291295
with Logging
292296
with Serializable {
@@ -365,34 +369,47 @@ private[sql] abstract class BaseWriterContainer(
365369
}
366370

367371
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
368-
val committerClass = context.getConfiguration.getClass(
369-
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
370-
371-
Option(committerClass).map { clazz =>
372-
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
373-
374-
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
375-
// has an associated output committer. To override this output committer,
376-
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
377-
// If a data source needs to override the output committer, it needs to set the
378-
// output committer in prepareForWrite method.
379-
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
380-
// The specified output committer is a FileOutputCommitter.
381-
// So, we will use the FileOutputCommitter-specified constructor.
382-
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
383-
ctor.newInstance(new Path(outputPath), context)
384-
} else {
385-
// The specified output committer is just a OutputCommitter.
386-
// So, we will use the no-argument constructor.
387-
val ctor = clazz.getDeclaredConstructor()
388-
ctor.newInstance()
372+
val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
373+
374+
if (isAppend) {
375+
// If we are appending data to an existing dir, we will only use the output committer
376+
// associated with the file output format since it is not safe to use a custom
377+
// committer for appending. For example, in S3, direct parquet output committer may
378+
// leave partial data in the destination dir when the the appending job fails.
379+
logInfo(
380+
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
381+
"for appending.")
382+
defaultOutputCommitter
383+
} else {
384+
val committerClass = context.getConfiguration.getClass(
385+
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
386+
387+
Option(committerClass).map { clazz =>
388+
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
389+
390+
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
391+
// has an associated output committer. To override this output committer,
392+
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
393+
// If a data source needs to override the output committer, it needs to set the
394+
// output committer in prepareForWrite method.
395+
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
396+
// The specified output committer is a FileOutputCommitter.
397+
// So, we will use the FileOutputCommitter-specified constructor.
398+
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
399+
ctor.newInstance(new Path(outputPath), context)
400+
} else {
401+
// The specified output committer is just a OutputCommitter.
402+
// So, we will use the no-argument constructor.
403+
val ctor = clazz.getDeclaredConstructor()
404+
ctor.newInstance()
405+
}
406+
}.getOrElse {
407+
// If output committer class is not set, we will use the one associated with the
408+
// file output format.
409+
logInfo(
410+
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
411+
defaultOutputCommitter
389412
}
390-
}.getOrElse {
391-
// If output committer class is not set, we will use the one associated with the
392-
// file output format.
393-
val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
394-
logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
395-
outputCommitter
396413
}
397414
}
398415

@@ -442,8 +459,9 @@ private[sql] abstract class BaseWriterContainer(
442459

443460
private[sql] class DefaultWriterContainer(
444461
@transient relation: HadoopFsRelation,
445-
@transient job: Job)
446-
extends BaseWriterContainer(relation, job) {
462+
@transient job: Job,
463+
isAppend: Boolean)
464+
extends BaseWriterContainer(relation, job, isAppend) {
447465

448466
@transient private var writer: OutputWriter = _
449467

@@ -482,8 +500,9 @@ private[sql] class DynamicPartitionWriterContainer(
482500
@transient relation: HadoopFsRelation,
483501
@transient job: Job,
484502
partitionColumns: Array[String],
485-
defaultPartitionName: String)
486-
extends BaseWriterContainer(relation, job) {
503+
defaultPartitionName: String,
504+
isAppend: Boolean)
505+
extends BaseWriterContainer(relation, job, isAppend) {
487506

488507
// All output writers are created on executor side.
489508
@transient protected var outputWriters: mutable.Map[String, OutputWriter] = _

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@
1717

1818
package org.apache.spark.sql.sources
1919

20+
import scala.collection.JavaConversions._
21+
2022
import java.io.File
2123

2224
import com.google.common.io.Files
25+
import org.apache.hadoop.conf.Configuration
2326
import org.apache.hadoop.fs.Path
27+
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
28+
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
29+
import parquet.hadoop.ParquetOutputCommitter
2430

2531
import org.apache.spark.deploy.SparkHadoopUtil
2632
import org.apache.spark.sql._
@@ -476,7 +482,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
476482
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
477483
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf
478484
// later.
479-
test("SPARK-8406: Avoids name collision while writing Parquet files") {
485+
test("SPARK-8406: Avoids name collision while writing files") {
480486
withTempPath { dir =>
481487
val path = dir.getCanonicalPath
482488
sqlContext
@@ -497,6 +503,81 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
497503
}
498504
}
499505
}
506+
507+
test("SPARK-8578 specified custom output committer will not be used to append data") {
508+
val clonedConf = new Configuration(configuration)
509+
try {
510+
val df = sqlContext.range(1, 10).toDF("i")
511+
withTempPath { dir =>
512+
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
513+
configuration.set(
514+
SQLConf.OUTPUT_COMMITTER_CLASS,
515+
classOf[AlwaysFailOutputCommitter].getName)
516+
// Since Parquet has its own output committer setting, also set it
517+
// to AlwaysFailParquetOutputCommitter at here.
518+
configuration.set("spark.sql.parquet.output.committer.class",
519+
classOf[AlwaysFailParquetOutputCommitter].getName)
520+
// Because there data already exists,
521+
// this append should succeed because we will use the output committer associated
522+
// with file format and AlwaysFailOutputCommitter will not be used.
523+
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
524+
checkAnswer(
525+
sqlContext.read
526+
.format(dataSourceName)
527+
.option("dataSchema", df.schema.json)
528+
.load(dir.getCanonicalPath),
529+
df.unionAll(df))
530+
531+
// This will fail because AlwaysFailOutputCommitter is used when we do append.
532+
intercept[Exception] {
533+
df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath)
534+
}
535+
}
536+
withTempPath { dir =>
537+
configuration.set(
538+
SQLConf.OUTPUT_COMMITTER_CLASS,
539+
classOf[AlwaysFailOutputCommitter].getName)
540+
// Since Parquet has its own output committer setting, also set it
541+
// to AlwaysFailParquetOutputCommitter at here.
542+
configuration.set("spark.sql.parquet.output.committer.class",
543+
classOf[AlwaysFailParquetOutputCommitter].getName)
544+
// Because there is no existing data,
545+
// this append will fail because AlwaysFailOutputCommitter is used when we do append
546+
// and there is no existing data.
547+
intercept[Exception] {
548+
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
549+
}
550+
}
551+
} finally {
552+
// Hadoop 1 doesn't have `Configuration.unset`
553+
configuration.clear()
554+
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
555+
}
556+
}
557+
}
558+
559+
// This class is used to test SPARK-8578. We should not use any custom output committer when
560+
// we actually append data to an existing dir.
561+
class AlwaysFailOutputCommitter(
562+
outputPath: Path,
563+
context: TaskAttemptContext)
564+
extends FileOutputCommitter(outputPath, context) {
565+
566+
override def commitJob(context: JobContext): Unit = {
567+
sys.error("Intentional job commitment failure for testing purpose.")
568+
}
569+
}
570+
571+
// This class is used to test SPARK-8578. We should not use any custom output committer when
572+
// we actually append data to an existing dir.
573+
class AlwaysFailParquetOutputCommitter(
574+
outputPath: Path,
575+
context: TaskAttemptContext)
576+
extends ParquetOutputCommitter(outputPath, context) {
577+
578+
override def commitJob(context: JobContext): Unit = {
579+
sys.error("Intentional job commitment failure for testing purpose.")
580+
}
500581
}
501582

502583
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {

0 commit comments

Comments
 (0)