Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,32 +383,41 @@ object FileFormatWriter extends Logging {

committer.setupTask(taskAttemptContext)

val dataWriter =
if (sparkPartitionId != 0 && !iterator.hasNext) {
// In case of empty job, leave first partition to save meta for file format like parquet.
new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
} else {
concurrentOutputWriterSpec match {
case Some(spec) =>
new DynamicPartitionDataConcurrentWriter(
description, taskAttemptContext, committer, spec)
case _ =>
new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)
}
}
var dataWriter: FileFormatDataWriter = null

Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
dataWriter =
if (sparkPartitionId != 0 && !iterator.hasNext) {
Copy link
Member Author

@viirya viirya Oct 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I looked at a customer issue, there is the following error

org.apache.spark.shuffle.FetchFailedException: Block shuffle_1_106_21 is corrupted but checksum verification passed

which is thrown at iterator.hasNext.

But I don't see the log of aborted task. It is because hasNext is not in try block. It doesn't read a good taste.

Besides, as setupTask is called, it is safer to call abort in any error case.

// In case of empty job, leave first partition to save meta for file format like parquet.
new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
} else {
concurrentOutputWriterSpec match {
case Some(spec) =>
new DynamicPartitionDataConcurrentWriter(
description, taskAttemptContext, committer, spec)
case _ =>
new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)
}
}

// Execute the task to write rows out and commit the task.
dataWriter.writeWithIterator(iterator)
dataWriter.commit()
})(catchBlock = {
// If there is an error, abort the task
dataWriter.abort()
logError(log"Job ${MDC(JOB_ID, jobId)} aborted.")
if (dataWriter != null) {
dataWriter.abort()
} else {
committer.abortTask(taskAttemptContext)
}
logError(log"Job: ${MDC(JOB_ID, jobId)}, Task: ${MDC(TASK_ID, taskId)}, " +
log"Task attempt ${MDC(TASK_ATTEMPT_ID, taskAttemptId)} aborted.")
Comment on lines +415 to +416
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We miss task and task attempt info previously. Adding them here.

}, finallyBlock = {
dataWriter.close()
if (dataWriter != null) {
dataWriter.close()
}
})
}

Expand Down