From 0ca13bbc096e52b2e26dca0ad005361e08c9e016 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Fri, 2 May 2014 14:50:07 -0700 Subject: [PATCH] SPARK-1700: Close socket file descriptors on task completion This will ensure that sockets do not build up over the course of a job, and that cancellation successfully cleans up sockets. Tested in standalone mode. More file descriptors spawn than expected (around 1000ish rather than the expected 8ish) but they do not pile up between runs, or as high as before (where they went up to around 5k). --- .../scala/org/apache/spark/api/python/PythonRDD.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 672c344a5659..61407007087c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val startTime = System.currentTimeMillis val env = SparkEnv.get - val worker = env.createPythonWorker(pythonExec, envVars.toMap) + val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap) + + // Ensure worker socket is closed on task completion. Closing sockets is idempotent. + context.addOnCompleteCallback(() => + try { + worker.close() + } catch { + case e: Exception => logWarning("Failed to close worker socket", e) + } + ) @volatile var readerException: Exception = null