Skip to content

Commit dadb720

Browse files
colinmasrowen
authored andcommitted
[SPARK-28340][CORE] Noisy exceptions when tasks are killed: "DiskBloc…
### What changes were proposed in this pull request? If a Spark task is killed due to intentional job kills, automated killing of redundant speculative tasks, etc, ClosedByInterruptException occurs if task has unfinished I/O operation with AbstractInterruptibleChannel. A single cancelled task can result in hundreds of stack trace of ClosedByInterruptException being logged. In this PR, stack trace of ClosedByInterruptException won't be logged like Executor.run do for InterruptedException. ### Why are the changes needed? Large numbers of spurious exceptions is confusing to users when they are inspecting Spark logs to diagnose other issues. ### Does this PR introduce any user-facing change? No ### How was this patch tested? N/A Closes #25674 from colinmjj/spark-28340. Authored-by: colinma <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 4a3a6b6 commit dadb720

File tree

2 files changed

+23
-4
lines changed

2 files changed

+23
-4
lines changed

core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.storage
1919

2020
import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
21-
import java.nio.channels.FileChannel
21+
import java.nio.channels.{ClosedByInterruptException, FileChannel}
2222

2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.serializer.{SerializationStream, SerializerInstance, SerializerManager}
@@ -219,6 +219,12 @@ private[spark] class DiskBlockObjectWriter(
219219
truncateStream = new FileOutputStream(file, true)
220220
truncateStream.getChannel.truncate(committedPosition)
221221
} catch {
222+
// ClosedByInterruptException is an excepted exception when kill task,
223+
// don't log the exception stack trace to avoid confusing users.
224+
// See: SPARK-28340
225+
case ce: ClosedByInterruptException =>
226+
logError("Exception occurred while reverting partial writes to file "
227+
+ file + ", " + ce.getMessage)
222228
case e: Exception =>
223229
logError("Uncaught exception while reverting partial writes to file " + file, e)
224230
} finally {

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.storage
1919

2020
import java.io.{InputStream, IOException}
21+
import java.nio.channels.ClosedByInterruptException
2122
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
2223
import javax.annotation.concurrent.GuardedBy
2324

@@ -349,9 +350,16 @@ final class ShuffleBlockFetcherIterator(
349350
results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId,
350351
buf.size(), buf, false))
351352
} catch {
353+
// If we see an exception, stop immediately.
352354
case e: Exception =>
353-
// If we see an exception, stop immediately.
354-
logError(s"Error occurred while fetching local blocks", e)
355+
e match {
356+
// ClosedByInterruptException is an excepted exception when kill task,
357+
// don't log the exception stack trace to avoid confusing users.
358+
// See: SPARK-28340
359+
case ce: ClosedByInterruptException =>
360+
logError("Error occurred while fetching local blocks, " + ce.getMessage)
361+
case ex: Exception => logError("Error occurred while fetching local blocks", ex)
362+
}
355363
results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e))
356364
return
357365
}
@@ -454,7 +462,12 @@ final class ShuffleBlockFetcherIterator(
454462
// The exception could only be throwed by local shuffle block
455463
case e: IOException =>
456464
assert(buf.isInstanceOf[FileSegmentManagedBuffer])
457-
logError("Failed to create input stream from local block", e)
465+
e match {
466+
case ce: ClosedByInterruptException =>
467+
logError("Failed to create input stream from local block, " +
468+
ce.getMessage)
469+
case e: IOException => logError("Failed to create input stream from local block", e)
470+
}
458471
buf.release()
459472
throwFetchFailedException(blockId, address, e)
460473
}

0 commit comments

Comments
 (0)