From 071fdd1bc68585ea597342df6f76aa958317dd0f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 28 Apr 2014 12:05:54 +0800 Subject: [PATCH 1/5] SPARK-1656: Fix potential resource leaks --- .../spark/broadcast/HttpBroadcast.scala | 26 ++++++++++++------- .../spark/deploy/SparkSubmitArguments.scala | 19 +++++++++----- .../master/FileSystemPersistenceEngine.scala | 16 +++++++++--- .../org/apache/spark/storage/DiskStore.scala | 7 ++++- 4 files changed, 46 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 29372f16f2cac..4e7abb74d08b9 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -159,18 +159,24 @@ private[spark] object HttpBroadcast extends Logging { def write(id: Long, value: Any) { val file = getFile(id) - val out: OutputStream = { - if (compress) { - compressionCodec.compressedOutputStream(new FileOutputStream(file)) - } else { - new BufferedOutputStream(new FileOutputStream(file), bufferSize) + val fileOutputStream = new FileOutputStream(file) + try { + val out: OutputStream = { + if (compress) { + compressionCodec.compressedOutputStream(fileOutputStream) + } else { + new BufferedOutputStream(fileOutputStream, bufferSize) + } } + val ser = SparkEnv.get.serializer.newInstance() + val serOut = ser.serializeStream(out) + serOut.writeObject(value) + serOut.close() + files += file.getAbsolutePath + } + finally { + fileOutputStream.close } - val ser = SparkEnv.get.serializer.newInstance() - val serOut = ser.serializeStream(out) - serOut.writeObject(value) - serOut.close() - files += file.getAbsolutePath } def read[T](id: Long): T = { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 58d9e9add764a..9a12b9b56d309 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -325,14 +325,19 @@ object SparkSubmitArguments { def getPropertiesFromFile(file: File): Seq[(String, String)] = { require(file.exists(), s"Properties file ${file.getName} does not exist") val inputStream = new FileInputStream(file) - val properties = new Properties() try { - properties.load(inputStream) - } catch { - case e: IOException => - val message = s"Failed when loading Spark properties file ${file.getName}" - throw new SparkException(message, e) + val properties = new Properties() + try { + properties.load(inputStream) + } catch { + case e: IOException => + val message = s"Failed when loading Spark properties file ${file.getName}" + throw new SparkException(message, e) + } + properties.stringPropertyNames().toSeq.map(k => (k, properties(k))) + } + finally { + inputStream.close } - properties.stringPropertyNames().toSeq.map(k => (k, properties(k))) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index aa85aa060d9c1..f05ef7e0860b9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -83,15 +83,23 @@ private[spark] class FileSystemPersistenceEngine( val serialized = serializer.toBinary(value) val out = new FileOutputStream(file) - out.write(serialized) - out.close() + try { + out.write(serialized) + } + finally { + out.close() + } } def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = { val fileData = new Array[Byte](file.length().asInstanceOf[Int]) val dis = new DataInputStream(new FileInputStream(file)) - dis.readFully(fileData) - dis.close() + try { + dis.readFully(fileData) + } + finally { + dis.close() + } val clazz = m.runtimeClass.asInstanceOf[Class[T]] val serializer = serialization.serializerFor(clazz) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 0ab9fad422717..5f22e2c6e64f2 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -77,7 +77,12 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val outputStream = new FileOutputStream(file) - blockManager.dataSerializeStream(blockId, outputStream, values) + try { + blockManager.dataSerializeStream(blockId, outputStream, values) + } + finally { + outputStream.close + } val length = file.length val timeTaken = System.currentTimeMillis - startTime From afc3383f9331e203048e13c2bc04fa7fa514030b Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 6 May 2014 21:36:35 +0800 Subject: [PATCH 2/5] Update to follow the code style --- .../apache/spark/broadcast/HttpBroadcast.scala | 5 ++--- .../spark/deploy/SparkSubmitArguments.scala | 17 +++++++---------- .../master/FileSystemPersistenceEngine.scala | 6 ++---- .../org/apache/spark/storage/DiskStore.scala | 11 ++++++++--- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 4e7abb74d08b9..ace0212fee110 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -173,9 +173,8 @@ private[spark] object HttpBroadcast extends Logging { serOut.writeObject(value) serOut.close() files += file.getAbsolutePath - } - finally { - fileOutputStream.close + } finally { + fileOutputStream.close() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 9a12b9b56d309..9d0dfdc5a448e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -327,17 +327,14 @@ object SparkSubmitArguments { val inputStream = new FileInputStream(file) try { val properties = new Properties() - try { - properties.load(inputStream) - } catch { - case e: IOException => - val message = s"Failed when loading Spark properties file ${file.getName}" - throw new SparkException(message, e) - } + properties.load(inputStream) properties.stringPropertyNames().toSeq.map(k => (k, properties(k))) - } - finally { - inputStream.close + } catch { + case e: IOException => + val message = s"Failed when loading Spark properties file ${file.getName}" + throw new SparkException(message, e) + } finally { + inputStream.close() } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index f05ef7e0860b9..08a99bbe68578 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -85,8 +85,7 @@ private[spark] class FileSystemPersistenceEngine( val out = new FileOutputStream(file) try { out.write(serialized) - } - finally { + } finally { out.close() } } @@ -96,8 +95,7 @@ private[spark] class FileSystemPersistenceEngine( val dis = new DataInputStream(new FileInputStream(file)) try { dis.readFully(fileData) - } - finally { + } finally { dis.close() } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 5f22e2c6e64f2..20a6f1228cb55 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -79,10 +79,15 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val outputStream = new FileOutputStream(file) try { blockManager.dataSerializeStream(blockId, outputStream, values) + outputStream.close() + } catch { + case e: Throwable => { + outputStream.close() + if(file.exists()) file.delete() + throw e + } } - finally { - outputStream.close - } + val length = file.length val timeTaken = System.currentTimeMillis - startTime From 28b90dcbda343a25bc0f3e49f3eca5150ce61ec0 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 12 Sep 2014 22:46:48 +0800 Subject: [PATCH 3/5] Update to follow the code style --- core/src/main/scala/org/apache/spark/storage/DiskStore.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 8f7a86eb7259f..45a42f9e5f92c 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -79,7 +79,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } catch { case e: Throwable => { outputStream.close() - if(file.exists()) file.delete() + if(file.exists()) { + file.delete() + } throw e } } From 2de96e56fc20335f286a27e1a5979f030ef61c5f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 14 Sep 2014 00:13:27 +0800 Subject: [PATCH 4/5] Make sure file will be deleted if exception happens --- .../main/scala/org/apache/spark/storage/DiskStore.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 45a42f9e5f92c..86aa6883c0585 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -74,11 +74,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc val file = diskManager.getFile(blockId) val outputStream = new FileOutputStream(file) try { - blockManager.dataSerializeStream(blockId, outputStream, values) - outputStream.close() + try { + blockManager.dataSerializeStream(blockId, outputStream, values) + } finally { + outputStream.close() + } } catch { case e: Throwable => { - outputStream.close() if(file.exists()) { file.delete() } From c431095b05a84ca26e532e63c4b54f08f29abc78 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 17 Sep 2014 09:56:42 +0800 Subject: [PATCH 5/5] Add a comment and fix the code style --- core/src/main/scala/org/apache/spark/storage/DiskStore.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 86aa6883c0585..714c0064eeb28 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -77,11 +77,12 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc try { blockManager.dataSerializeStream(blockId, outputStream, values) } finally { + // Close outputStream here because it should be closed before file is deleted. outputStream.close() } } catch { case e: Throwable => { - if(file.exists()) { + if (file.exists()) { file.delete() } throw e