5454}
5555
5656
57+ class Py4jCallbackConnectionCleaner (object ):
58+
59+ """
60+ A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
61+ It will scan all callback connections every 30 seconds and close the dead connections.
62+ """
63+
64+ def __init__ (self , gateway ):
65+ self ._gateway = gateway
66+ self ._stopped = False
67+ self ._timer = None
68+ self ._lock = RLock ()
69+
70+ def start (self ):
71+ if self ._stopped :
72+ return
73+
74+ def clean_closed_connections ():
75+ from py4j .java_gateway import quiet_close , quiet_shutdown
76+
77+ callback_server = self ._gateway ._callback_server
78+ with callback_server .lock :
79+ try :
80+ closed_connections = []
81+ for connection in callback_server .connections :
82+ if not connection .isAlive ():
83+ quiet_close (connection .input )
84+ quiet_shutdown (connection .socket )
85+ quiet_close (connection .socket )
86+ closed_connections .append (connection )
87+
88+ for closed_connection in closed_connections :
89+ callback_server .connections .remove (closed_connection )
90+ except Exception :
91+ import traceback
92+ traceback .print_exc ()
93+
94+ self ._start_timer (clean_closed_connections )
95+
96+ self ._start_timer (clean_closed_connections )
97+
98+ def _start_timer (self , f ):
99+ from threading import Timer
100+
101+ with self ._lock :
102+ if not self ._stopped :
103+ self ._timer = Timer (30.0 , f )
104+ self ._timer .start ()
105+
106+ def stop (self ):
107+ with self ._lock :
108+ self ._stopped = True
109+ if self ._timer :
110+ self ._timer .cancel ()
111+ self ._timer = None
112+
113+
57114class SparkContext (object ):
58115
59116 """
@@ -68,6 +125,7 @@ class SparkContext(object):
68125 _active_spark_context = None
69126 _lock = RLock ()
70127 _python_includes = None # zip and egg files that need to be added to PYTHONPATH
128+ _py4j_cleaner = None
71129
72130 PACKAGE_EXTENSIONS = ('.zip' , '.egg' , '.jar' )
73131
@@ -244,6 +302,8 @@ def _ensure_initialized(cls, instance=None, gateway=None):
244302 if not SparkContext ._gateway :
245303 SparkContext ._gateway = gateway or launch_gateway ()
246304 SparkContext ._jvm = SparkContext ._gateway .jvm
305+ _py4j_cleaner = Py4jCallbackConnectionCleaner (SparkContext ._gateway )
306+ _py4j_cleaner .start ()
247307
248308 if instance :
249309 if (SparkContext ._active_spark_context and
0 commit comments