Skip to content

Commit 6a6317d

Browse files
committed
[SPARK-53037][CORE][SQL][K8S] Support closeQuietly in SparkErrorUtils
### What changes were proposed in this pull request? This PR aims to support `closeQuietly` in `SparkErrorUtils`. ### Why are the changes needed? To provide a way to close a closable object without any exceptions and logs. ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51742 from dongjoon-hyun/SPARK-53037. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 293342d commit 6a6317d

File tree

16 files changed

+43
-27
lines changed

16 files changed

+43
-27
lines changed

common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ private[spark] trait SparkErrorUtils extends Logging {
105105
}
106106
new String(out.toByteArray, UTF_8)
107107
}
108+
109+
/** Try to close by ignoring all exceptions. This is different from JavaUtils.closeQuietly. */
110+
def closeQuietly(closeable: Closeable): Unit = {
111+
if (closeable != null) {
112+
try {
113+
closeable.close()
114+
} catch {
115+
case _: Exception =>
116+
}
117+
}
118+
}
108119
}
109120

110121
private[spark] object SparkErrorUtils extends SparkErrorUtils

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.function.Supplier;
2828

2929
import com.google.common.annotations.VisibleForTesting;
30-
import org.apache.commons.io.IOUtils;
3130

3231
import org.apache.spark.TaskContext;
3332
import org.apache.spark.executor.ShuffleWriteMetrics;
@@ -886,7 +885,7 @@ public void close() throws IOException {
886885

887886
private void closeIfPossible(UnsafeSorterIterator iterator) {
888887
if (iterator instanceof Closeable closeable) {
889-
IOUtils.closeQuietly((closeable));
888+
Utils.closeQuietly((closeable));
890889
}
891890
}
892891
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import scala.util.control.NonFatal
3636
import com.codahale.metrics.{MetricRegistry, MetricSet}
3737
import com.esotericsoftware.kryo.KryoException
3838
import com.google.common.cache.CacheBuilder
39-
import org.apache.commons.io.IOUtils
4039

4140
import org.apache.spark._
4241
import org.apache.spark.errors.SparkCoreErrors
@@ -373,7 +372,7 @@ private[spark] class BlockManager(
373372
logInfo(extendMessageWithBlockDetails(ex.getMessage, blockId))
374373
throw ex
375374
} finally {
376-
IOUtils.closeQuietly(inputStream)
375+
Utils.closeQuietly(inputStream)
377376
}
378377
}
379378

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
3030
import scala.util.{Failure, Success}
3131

3232
import io.netty.util.internal.OutOfDirectMemoryError
33-
import org.apache.commons.io.IOUtils
3433
import org.roaringbitmap.RoaringBitmap
3534

3635
import org.apache.spark.{MapOutputTracker, SparkException, TaskContext}
@@ -1408,7 +1407,7 @@ private class BufferReleasingInputStream(
14081407
val diagnosisResponse = checkedInOpt.map { checkedIn =>
14091408
iterator.diagnoseCorruption(checkedIn, address, blockId)
14101409
}
1411-
IOUtils.closeQuietly(this)
1410+
Utils.closeQuietly(this)
14121411
// We'd never retry the block whatever the cause is since the block has been
14131412
// partially consumed by downstream RDDs.
14141413
iterator.throwFetchFailedException(blockId, mapIndex, address, e, diagnosisResponse)

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3084,8 +3084,8 @@ private[spark] object Utils
30843084
logDebug(log"Unzipped from ${MDC(PATH, dfsZipFile)}\n\t${MDC(PATHS, files.mkString("\n\t"))}")
30853085
} finally {
30863086
// Close everything no matter what happened
3087-
IOUtils.closeQuietly(in)
3088-
IOUtils.closeQuietly(out)
3087+
Utils.closeQuietly(in)
3088+
Utils.closeQuietly(out)
30893089
}
30903090
files.toSeq
30913091
}

core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.SparkConf
2727
import org.apache.spark.internal.{config, MDC}
2828
import org.apache.spark.internal.LogKeys._
2929
import org.apache.spark.util.ArrayImplicits._
30+
import org.apache.spark.util.Utils
3031

3132
/**
3233
* Continuously appends data from input stream into the given file, and rolls
@@ -96,8 +97,8 @@ private[spark] class RollingFileAppender(
9697
gzOutputStream.close()
9798
activeFile.delete()
9899
} finally {
99-
IOUtils.closeQuietly(inputStream)
100-
IOUtils.closeQuietly(gzOutputStream)
100+
Utils.closeQuietly(inputStream)
101+
Utils.closeQuietly(gzOutputStream)
101102
}
102103
} else {
103104
Files.move(activeFile, rolloverFile)

core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
2222

2323
import org.apache.commons.io.IOUtils
2424

25+
import org.apache.spark.util.Utils
2526

2627
class JobArtifactSetSuite extends SparkFunSuite with LocalSparkContext {
2728

@@ -33,8 +34,8 @@ class JobArtifactSetSuite extends SparkFunSuite with LocalSparkContext {
3334
val zipEntry = new ZipEntry(fileToZip.getName)
3435
zipOut.putNextEntry(zipEntry)
3536
IOUtils.copy(fis, zipOut)
36-
IOUtils.closeQuietly(fis)
37-
IOUtils.closeQuietly(zipOut)
37+
Utils.closeQuietly(fis)
38+
Utils.closeQuietly(zipOut)
3839
}
3940

4041
test("JobArtifactSet uses resources from SparkContext") {

core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.scalatest.BeforeAndAfter
3535

3636
import org.apache.spark.{SparkConf, SparkFunSuite}
3737
import org.apache.spark.internal.config
38+
import org.apache.spark.util.Utils
3839
import org.apache.spark.util.logging.{FileAppender, RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy}
3940

4041
class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter {
@@ -389,7 +390,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter {
389390
try {
390391
IOUtils.toString(inputStream, StandardCharsets.UTF_8)
391392
} finally {
392-
IOUtils.closeQuietly(inputStream)
393+
Utils.closeQuietly(inputStream)
393394
}
394395
} else {
395396
Files.asCharSource(file, StandardCharsets.UTF_8).read()

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties {
248248
assert(mergedStream.read() === -1)
249249
assert(byteBufferInputStream.chunkedByteBuffer === null)
250250
} finally {
251-
IOUtils.closeQuietly(mergedStream)
252-
IOUtils.closeQuietly(in)
251+
Utils.closeQuietly(mergedStream)
252+
Utils.closeQuietly(in)
253253
}
254254
}
255255
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream
3333

3434
import org.apache.spark.{SPARK_VERSION, SparkException}
3535
import org.apache.spark.internal.Logging
36+
import org.apache.spark.util.SparkErrorUtils
3637

3738
object Utils extends Logging {
3839

@@ -146,8 +147,8 @@ object Utils extends Logging {
146147
val zipEntry = new ZipEntry(fileToZip.getName)
147148
zipOut.putNextEntry(zipEntry)
148149
IOUtils.copy(fis, zipOut)
149-
IOUtils.closeQuietly(fis)
150-
IOUtils.closeQuietly(zipOut)
150+
SparkErrorUtils.closeQuietly(fis)
151+
SparkErrorUtils.closeQuietly(zipOut)
151152
}
152153

153154
def createTarGzFile(inFile: String, outFile: String): Unit = {

0 commit comments

Comments
 (0)