Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,64 @@
}


class Py4jCallbackConnectionCleaner(object):

"""
A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
It will scan all callback connections every 30 seconds and close the dead connections.
"""

def __init__(self, gateway):
self._gateway = gateway
self._stopped = False
self._timer = None
self._lock = RLock()

def start(self):
if self._stopped:
return

def clean_closed_connections():
from py4j.java_gateway import quiet_close, quiet_shutdown

callback_server = self._gateway._callback_server
with callback_server.lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Traceback (most recent call last):
File "//anaconda/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "//anaconda/lib/python2.7/threading.py", line 1082, in run
self.function(_self.args, *_self.kwargs)
File "/Users/davies/work/spark/python/pyspark/context.py", line 78, in clean_closed_connections
with callback_server.lock:
AttributeError: 'NoneType' object has no attribute 'lock'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that I didn't notice that only Streaming uses the callback server. I sent #10621 to fix it.

try:
closed_connections = []
for connection in callback_server.connections:
if not connection.isAlive():
quiet_close(connection.input)
quiet_shutdown(connection.socket)
quiet_close(connection.socket)
closed_connections.append(connection)

for closed_connection in closed_connections:
callback_server.connections.remove(closed_connection)
except Exception:
import traceback
traceback.print_exc()

self._start_timer(clean_closed_connections)

self._start_timer(clean_closed_connections)

def _start_timer(self, f):
from threading import Timer

with self._lock:
if not self._stopped:
self._timer = Timer(30.0, f)
self._timer.daemon = True
self._timer.start()

def stop(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not called because PySpark never stops the gateway server.

with self._lock:
self._stopped = True
if self._timer:
self._timer.cancel()
self._timer = None


class SparkContext(object):

"""
Expand All @@ -68,6 +126,7 @@ class SparkContext(object):
_active_spark_context = None
_lock = RLock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
_py4j_cleaner = None

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')

Expand Down Expand Up @@ -244,6 +303,8 @@ def _ensure_initialized(cls, instance=None, gateway=None):
if not SparkContext._gateway:
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
_py4j_cleaner = Py4jCallbackConnectionCleaner(SparkContext._gateway)
_py4j_cleaner.start()

if instance:
if (SparkContext._active_spark_context and
Expand Down