We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 4466010 commit a214755Copy full SHA for a214755
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -68,20 +68,16 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria
68
}
69
datasetWithIncrementalExecution.foreachPartition { iter =>
70
if (writer.open(TaskContext.getPartitionId(), batchId)) {
71
- var isFailed = false
72
try {
73
while (iter.hasNext) {
74
writer.process(iter.next())
75
76
} catch {
77
case e: Throwable =>
78
- isFailed = true
79
writer.close(e)
80
throw e
81
82
- if (!isFailed) {
83
- writer.close(null)
84
- }
+ writer.close(null)
85
} else {
86
writer.close(null)
87
0 commit comments