From f0dc402d02cbd84332d52526c60e984d45ef6fb4 Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Thu, 18 Feb 2016 14:47:56 -0800 Subject: [PATCH 1/2] SPARK-12729. PhantomReference to replace finalize in python broadcast.Redesigned the code to keep track of the creation and closing of the phantom reference thread.Fixed null pointer exceptions --- .../apache/spark/api/python/PythonRDD.scala | 64 +++++++++++++++---- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f12e2dfafa19d..17e1bbb5b848f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -18,11 +18,14 @@ package org.apache.spark.api.python import java.io._ +import java.lang.ref.PhantomReference +import java.lang.ref.ReferenceQueue import java.net._ import java.util.{ArrayList => JArrayList, Collections, List => JList, Map => JMap} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ListBuffer import scala.language.existentials import scala.util.control.NonFatal @@ -871,6 +874,44 @@ private class PythonAccumulatorParam(@transient private val serverHost: String, * write the data into disk after deserialization, then Python can read it from disks. */ // scalastyle:off no.finalize + +/** + * Create a class that extends PhantomReference. + */ +private[spark] class FilePhantomReference(@transient var f: File, var q: ReferenceQueue[File]) + extends PhantomReference(f, q){ + + private def cleanup() + { + f.delete() + } +} + +private[spark] class PhantomThread( threadName: String, queue: ReferenceQueue[File], + phantomReferences: ListBuffer[FilePhantomReference]) + extends Thread with Logging { + + def shutdownOnTaskCompletion() + { + this.interrupt() + } + + setDaemon(true) + override def run(): Unit = Utils.logUncaughtExceptions { + while(phantomReferences.size > 0) + { + try { + val ref = queue.remove().asInstanceOf[FilePhantomReference] + phantomReferences -= ref + }catch { + case ex: InterruptedException => { + logDebug("Exception thrown after file object cleanup", ex) + } + } + } + } +} + private[spark] class PythonBroadcast(@transient var path: String) extends Serializable with Logging { @@ -892,8 +933,18 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { val dir = new File(Utils.getLocalDir(SparkEnv.get.conf)) val file = File.createTempFile("broadcast", "", dir) + val queue = new ReferenceQueue[File]() + val exitWhenFinished=false + val phantomReferences = new ListBuffer[FilePhantomReference]() + val threadName = "WeakReference" + val phantomThread=new PhantomThread(threadName,queue,phantomReferences) + path = file.getAbsolutePath val out = new FileOutputStream(file) + phantomReferences += new FilePhantomReference(file, queue) + phantomThread.start() + if(phantomReferences.size == 0) + phantomThread.shutdownOnTaskCompletion Utils.tryWithSafeFinally { Utils.copyStream(in, out) } { @@ -901,18 +952,5 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial } } - /** - * Delete the file once the object is GCed. - */ - override def finalize() { - if (!path.isEmpty) { - val file = new File(path) - if (file.exists()) { - if (!file.delete()) { - logWarning(s"Error deleting ${file.getPath}") - } - } - } - } } // scalastyle:on no.finalize From d1ded9bdc72ea3c2cc9e374828c9eab40c46bc7c Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Fri, 19 Feb 2016 12:17:10 -0800 Subject: [PATCH 2/2] SPARK-12729. Fixed style issues and invoked the cleanup method --- .../apache/spark/api/python/PythonRDD.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 17e1bbb5b848f..9d2d9b2825e23 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -878,16 +878,16 @@ private class PythonAccumulatorParam(@transient private val serverHost: String, /** * Create a class that extends PhantomReference. */ -private[spark] class FilePhantomReference(@transient var f: File, var q: ReferenceQueue[File]) +class FilePhantomReference(@transient var f: File, var q: ReferenceQueue[File]) extends PhantomReference(f, q){ - private def cleanup() + def cleanup() { f.delete() } } -private[spark] class PhantomThread( threadName: String, queue: ReferenceQueue[File], +class PhantomThread( threadName: String, queue: ReferenceQueue[File], phantomReferences: ListBuffer[FilePhantomReference]) extends Thread with Logging { @@ -895,15 +895,15 @@ private[spark] class PhantomThread( threadName: String, queue: ReferenceQueue[Fi { this.interrupt() } - setDaemon(true) override def run(): Unit = Utils.logUncaughtExceptions { - while(phantomReferences.size > 0) + while (phantomReferences.size > 0) { try { val ref = queue.remove().asInstanceOf[FilePhantomReference] phantomReferences -= ref - }catch { + ref.cleanup() + } catch { case ex: InterruptedException => { logDebug("Exception thrown after file object cleanup", ex) } @@ -934,23 +934,22 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial val dir = new File(Utils.getLocalDir(SparkEnv.get.conf)) val file = File.createTempFile("broadcast", "", dir) val queue = new ReferenceQueue[File]() - val exitWhenFinished=false + val exitWhenFinished = false val phantomReferences = new ListBuffer[FilePhantomReference]() val threadName = "WeakReference" - val phantomThread=new PhantomThread(threadName,queue,phantomReferences) - + val phantomThread = new PhantomThread(threadName, queue, phantomReferences) path = file.getAbsolutePath val out = new FileOutputStream(file) phantomReferences += new FilePhantomReference(file, queue) phantomThread.start() - if(phantomReferences.size == 0) + if (phantomReferences.size == 0) { phantomThread.shutdownOnTaskCompletion + } Utils.tryWithSafeFinally { Utils.copyStream(in, out) } { out.close() } } - } // scalastyle:on no.finalize