From e9f990b1f9f939ec7d7a0f1174b6c20e6c8605d6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 4 Oct 2024 23:43:14 -0700 Subject: [PATCH 1/2] Move iterator.hasNex into try block --- .../datasources/FileFormatWriter.scala | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 91749ddd794fb..0515435532822 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -383,32 +383,40 @@ object FileFormatWriter extends Logging { committer.setupTask(taskAttemptContext) - val dataWriter = - if (sparkPartitionId != 0 && !iterator.hasNext) { - // In case of empty job, leave first partition to save meta for file format like parquet. - new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) - } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { - new SingleDirectoryDataWriter(description, taskAttemptContext, committer) - } else { - concurrentOutputWriterSpec match { - case Some(spec) => - new DynamicPartitionDataConcurrentWriter( - description, taskAttemptContext, committer, spec) - case _ => - new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) - } - } + var dataWriter: FileFormatDataWriter = null Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + dataWriter = + if (sparkPartitionId != 0 && !iterator.hasNext) { + // In case of empty job, leave first partition to save meta for file format like parquet. + new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) + } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { + new SingleDirectoryDataWriter(description, taskAttemptContext, committer) + } else { + concurrentOutputWriterSpec match { + case Some(spec) => + new DynamicPartitionDataConcurrentWriter( + description, taskAttemptContext, committer, spec) + case _ => + new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + } + } + // Execute the task to write rows out and commit the task. dataWriter.writeWithIterator(iterator) dataWriter.commit() })(catchBlock = { // If there is an error, abort the task - dataWriter.abort() + if (dataWriter != null) { + dataWriter.abort() + } else { + committer.abortTask(taskAttemptContext) + } logError(log"Job ${MDC(JOB_ID, jobId)} aborted.") }, finallyBlock = { - dataWriter.close() + if (dataWriter != null) { + dataWriter.close() + } }) } From a942699b2d7f47ee7d7474da5819e108ce4c28d0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 5 Oct 2024 14:45:31 -0700 Subject: [PATCH 2/2] Add more log info --- .../spark/sql/execution/datasources/FileFormatWriter.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 0515435532822..5e6107c4f49c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -412,7 +412,8 @@ object FileFormatWriter extends Logging { } else { committer.abortTask(taskAttemptContext) } - logError(log"Job ${MDC(JOB_ID, jobId)} aborted.") + logError(log"Job: ${MDC(JOB_ID, jobId)}, Task: ${MDC(TASK_ID, taskId)}, " + + log"Task attempt ${MDC(TASK_ATTEMPT_ID, taskAttemptId)} aborted.") }, finallyBlock = { if (dataWriter != null) { dataWriter.close()