Skip to content

Commit 7e2d0a4

Browse files
committed
make sure we close current writer
1 parent 8100100 commit 7e2d0a4

File tree

1 file changed

+19
-12
lines changed

1 file changed

+19
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -337,20 +337,27 @@ private[sql] class DynamicPartitionWriterContainer(
337337
val sortedIterator = sorter.sortedIterator()
338338
var currentKey: InternalRow = null
339339
var currentWriter: OutputWriter = null
340-
while(sortedIterator.next()) {
341-
if (currentKey != sortedIterator.getKey) {
342-
if (currentWriter != null) { currentWriter.close() }
343-
currentKey = sortedIterator.getKey.copy()
344-
logDebug(s"Writing partition: $currentKey")
345-
346-
// Either use an existing file from before, or open a new one.
347-
currentWriter = outputWriters.remove(currentKey)
348-
if (currentWriter == null) { currentWriter = newOutputWriter(currentKey) }
340+
try {
341+
while (sortedIterator.next()) {
342+
if (currentKey != sortedIterator.getKey) {
343+
if (currentWriter != null) {
344+
currentWriter.close()
345+
}
346+
currentKey = sortedIterator.getKey.copy()
347+
logDebug(s"Writing partition: $currentKey")
348+
349+
// Either use an existing file from before, or open a new one.
350+
currentWriter = outputWriters.remove(currentKey)
351+
if (currentWriter == null) {
352+
currentWriter = newOutputWriter(currentKey)
353+
}
354+
}
355+
356+
currentWriter.writeInternal(sortedIterator.getValue)
349357
}
350-
351-
currentWriter.writeInternal(sortedIterator.getValue)
358+
} finally {
359+
if (currentWriter != null) { currentWriter.close() }
352360
}
353-
currentWriter.close()
354361
}
355362

356363
commitTask()

0 commit comments

Comments
 (0)