Skip to content

Commit bf543d8

Browse files
committed
Move iterator.hasNex into try block
1 parent d3eb99f commit bf543d8

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -383,32 +383,38 @@ object FileFormatWriter extends Logging {
383383

384384
committer.setupTask(taskAttemptContext)
385385

386-
val dataWriter =
387-
if (sparkPartitionId != 0 && !iterator.hasNext) {
388-
// In case of empty job, leave first partition to save meta for file format like parquet.
389-
new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
390-
} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
391-
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
392-
} else {
393-
concurrentOutputWriterSpec match {
394-
case Some(spec) =>
395-
new DynamicPartitionDataConcurrentWriter(
396-
description, taskAttemptContext, committer, spec)
397-
case _ =>
398-
new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)
399-
}
400-
}
386+
var dataWriter: FileFormatDataWriter = null
401387

402388
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
389+
dataWriter =
390+
if (sparkPartitionId != 0 && !iterator.hasNext) {
391+
// In case of empty job, leave first partition to save meta for file format like parquet.
392+
new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
393+
} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
394+
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
395+
} else {
396+
concurrentOutputWriterSpec match {
397+
case Some(spec) =>
398+
new DynamicPartitionDataConcurrentWriter(
399+
description, taskAttemptContext, committer, spec)
400+
case _ =>
401+
new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)
402+
}
403+
}
404+
403405
// Execute the task to write rows out and commit the task.
404406
dataWriter.writeWithIterator(iterator)
405407
dataWriter.commit()
406408
})(catchBlock = {
407409
// If there is an error, abort the task
408-
dataWriter.abort()
410+
if (dataWriter != null) {
411+
dataWriter.abort()
412+
}
409413
logError(log"Job ${MDC(JOB_ID, jobId)} aborted.")
410414
}, finallyBlock = {
411-
dataWriter.close()
415+
if (dataWriter != null) {
416+
dataWriter.close()
417+
}
412418
})
413419
}
414420

0 commit comments

Comments
 (0)