From 78a2172e4261b52297c66fbe5624b9f04258c2d4 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Mon, 26 Oct 2015 23:43:56 -0700 Subject: [PATCH 1/4] [SPARK-11382] [SQL] Improve error message when hitting this issue. The issue is that the output commiter is not idempotent and retry attempts will fail because the output file already exists. It is not safe to clean up the file as this output committer is by design not retryable. Currently, the job fails with a confusing file exists error. This patch is a stop gap to tell the user to look at the top of the error log for the proper message. This is difficult to test locally as Spark is hardcoded not to retry. Manually verified by upping the retry attempts. --- .../execution/datasources/WriterContainer.scala | 17 ++++++++++++++++- .../parquet/DirectParquetOutputCommitter.scala | 3 ++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 1b59b19d9420..15d4c62aec68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -234,7 +234,22 @@ private[sql] class DefaultWriterContainer( executorSideSetup(taskContext) val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) configuration.set("spark.sql.sources.output.path", outputPath) - val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) + val writer = try { + outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) + } catch { + case e: org.apache.hadoop.fs.FileAlreadyExistsException => + if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) { + // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry + // attempts, the task will fail because the output file is created from a prior attempt. + // This often means the most visible error to the user is misleading. Augment the error + // to tell the user to look for the actual error. + throw new SparkException("The output file already exists but this could be due to a " + + "failure from a earlier attempt. Look through the earlier logs for the first " + + "error.\n File exists error: " + e) + } + throw e + case e: Exception => throw e + } writer.initConverter(dataSchema) var writerClosed = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala index 300e8677b312..1a4e99ff10af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala @@ -41,7 +41,8 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are * left empty). */ -private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) +private[datasources] class DirectParquetOutputCommitter( + outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) { val LOG = Log.getLog(classOf[ParquetOutputCommitter]) From 0d35d604bdd7291cacda7e893a266387fe93b5df Mon Sep 17 00:00:00 2001 From: Nong Li Date: Mon, 30 Nov 2015 12:52:00 -0800 Subject: [PATCH 2/4] Code review. --- .../apache/spark/sql/execution/datasources/WriterContainer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 15d4c62aec68..3266007fd46d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -248,7 +248,6 @@ private[sql] class DefaultWriterContainer( "error.\n File exists error: " + e) } throw e - case e: Exception => throw e } writer.initConverter(dataSchema) From 3ed9f67d4d9ebc1b7061357347dc717433acad33 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Mon, 30 Nov 2015 13:58:27 -0800 Subject: [PATCH 3/4] CR --- .../spark/sql/execution/datasources/WriterContainer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 3266007fd46d..e8f77778c21a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -244,8 +244,8 @@ private[sql] class DefaultWriterContainer( // This often means the most visible error to the user is misleading. Augment the error // to tell the user to look for the actual error. throw new SparkException("The output file already exists but this could be due to a " + - "failure from a earlier attempt. Look through the earlier logs for the first " + - "error.\n File exists error: " + e) + "failure from an earlier attempt. Look through the earlier logs or stage page for " + + "the first error.\n File exists error: " + e) } throw e } From c4375ec7396bf914b0b74b03c40e774ff4bc9477 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Tue, 1 Dec 2015 11:19:52 -0800 Subject: [PATCH 4/4] Add another place to catch the exception. --- .../datasources/WriterContainer.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index e8f77778c21a..ad5536725889 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -124,6 +124,24 @@ private[sql] abstract class BaseWriterContainer( } } + protected def newOutputWriter(path: String): OutputWriter = { + try { + outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext) + } catch { + case e: org.apache.hadoop.fs.FileAlreadyExistsException => + if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) { + // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry + // attempts, the task will fail because the output file is created from a prior attempt. + // This often means the most visible error to the user is misleading. Augment the error + // to tell the user to look for the actual error. + throw new SparkException("The output file already exists but this could be due to a " + + "failure from an earlier attempt. Look through the earlier logs or stage page for " + + "the first error.\n File exists error: " + e) + } + throw e + } + } + private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) @@ -234,21 +252,7 @@ private[sql] class DefaultWriterContainer( executorSideSetup(taskContext) val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) configuration.set("spark.sql.sources.output.path", outputPath) - val writer = try { - outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) - } catch { - case e: org.apache.hadoop.fs.FileAlreadyExistsException => - if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) { - // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry - // attempts, the task will fail because the output file is created from a prior attempt. - // This often means the most visible error to the user is misleading. Augment the error - // to tell the user to look for the actual error. - throw new SparkException("The output file already exists but this could be due to a " + - "failure from an earlier attempt. Look through the earlier logs or stage page for " + - "the first error.\n File exists error: " + e) - } - throw e - } + val writer = newOutputWriter(getWorkPath) writer.initConverter(dataSchema) var writerClosed = false @@ -417,7 +421,7 @@ private[sql] class DynamicPartitionWriterContainer( val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) configuration.set( "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) - val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) + val newWriter = super.newOutputWriter(path.toString) newWriter.initConverter(dataSchema) newWriter }