Skip to content

Commit 047a31b

Browse files
zsxwingdavies
authored andcommitted
[SPARK-12617] [PYSPARK] Clean up the leak sockets of Py4J
This patch added Py4jCallbackConnectionCleaner to clean the leak sockets of Py4J every 30 seconds. This is a workaround before Py4J fixes the leak issue py4j/py4j#187 Author: Shixiong Zhu <[email protected]> Closes #10579 from zsxwing/SPARK-12617.
1 parent d202ad2 commit 047a31b

File tree

1 file changed

+61
-0
lines changed

1 file changed

+61
-0
lines changed

python/pyspark/context.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,64 @@
5454
}
5555

5656

57+
class Py4jCallbackConnectionCleaner(object):
58+
59+
"""
60+
A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
61+
It will scan all callback connections every 30 seconds and close the dead connections.
62+
"""
63+
64+
def __init__(self, gateway):
65+
self._gateway = gateway
66+
self._stopped = False
67+
self._timer = None
68+
self._lock = RLock()
69+
70+
def start(self):
71+
if self._stopped:
72+
return
73+
74+
def clean_closed_connections():
75+
from py4j.java_gateway import quiet_close, quiet_shutdown
76+
77+
callback_server = self._gateway._callback_server
78+
with callback_server.lock:
79+
try:
80+
closed_connections = []
81+
for connection in callback_server.connections:
82+
if not connection.isAlive():
83+
quiet_close(connection.input)
84+
quiet_shutdown(connection.socket)
85+
quiet_close(connection.socket)
86+
closed_connections.append(connection)
87+
88+
for closed_connection in closed_connections:
89+
callback_server.connections.remove(closed_connection)
90+
except Exception:
91+
import traceback
92+
traceback.print_exc()
93+
94+
self._start_timer(clean_closed_connections)
95+
96+
self._start_timer(clean_closed_connections)
97+
98+
def _start_timer(self, f):
99+
from threading import Timer
100+
101+
with self._lock:
102+
if not self._stopped:
103+
self._timer = Timer(30.0, f)
104+
self._timer.daemon = True
105+
self._timer.start()
106+
107+
def stop(self):
108+
with self._lock:
109+
self._stopped = True
110+
if self._timer:
111+
self._timer.cancel()
112+
self._timer = None
113+
114+
57115
class SparkContext(object):
58116

59117
"""
@@ -68,6 +126,7 @@ class SparkContext(object):
68126
_active_spark_context = None
69127
_lock = RLock()
70128
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
129+
_py4j_cleaner = None
71130

72131
PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
73132

@@ -244,6 +303,8 @@ def _ensure_initialized(cls, instance=None, gateway=None):
244303
if not SparkContext._gateway:
245304
SparkContext._gateway = gateway or launch_gateway()
246305
SparkContext._jvm = SparkContext._gateway.jvm
306+
_py4j_cleaner = Py4jCallbackConnectionCleaner(SparkContext._gateway)
307+
_py4j_cleaner.start()
247308

248309
if instance:
249310
if (SparkContext._active_spark_context and

0 commit comments

Comments
 (0)