Skip to content

Commit d2cbd3d

Browse files
committed
SPARK-1700: Close socket file descriptors on task completion
This will ensure that sockets do not build up over the course of a job, and that cancellation successfully cleans up sockets. Tested in standalone mode. More file descriptors spawn than expected (around 1000ish rather than the expected 8ish) but they do not pile up between runs, or as high as before (where they went up to around 5k). Author: Aaron Davidson <[email protected]> Closes #623 from aarondav/pyspark2 and squashes the following commits: 0ca13bb [Aaron Davidson] SPARK-1700: Close socket file descriptors on task completion (cherry picked from commit 0a14421) Signed-off-by: Aaron Davidson <[email protected]>
1 parent a314342 commit d2cbd3d

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag](
5454
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
5555
val startTime = System.currentTimeMillis
5656
val env = SparkEnv.get
57-
val worker = env.createPythonWorker(pythonExec, envVars.toMap)
57+
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
58+
59+
// Ensure worker socket is closed on task completion. Closing sockets is idempotent.
60+
context.addOnCompleteCallback(() =>
61+
try {
62+
worker.close()
63+
} catch {
64+
case e: Exception => logWarning("Failed to close worker socket", e)
65+
}
66+
)
5867

5968
@volatile var readerException: Exception = null
6069

0 commit comments

Comments
 (0)