Skip to content
27 changes: 20 additions & 7 deletions pymongo/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1596,8 +1596,14 @@ def _process_kill_cursors(self):
try:
self._cleanup_cursor(True, cursor_id, address, sock_mgr,
None, False)
except Exception:
helpers._handle_exception()
except Exception as exc:
if (isinstance(exc, InvalidOperation)
and self._topology._closed):
# Raise the exception when client is closed so that it
# can be caught in _process_periodic_tasks
raise
else:
helpers._handle_exception()

# Don't re-open topology if it's closed and there's no pending cursors.
if address_to_cursor_ids:
Expand All @@ -1606,18 +1612,25 @@ def _process_kill_cursors(self):
try:
self._kill_cursors(
cursor_ids, address, topology, session=None)
except Exception:
helpers._handle_exception()
except Exception as exc:
if (isinstance(exc, InvalidOperation) and
self._topology._closed):
raise
else:
helpers._handle_exception()

# This method is run periodically by a background thread.
def _process_periodic_tasks(self):
"""Process any pending kill cursors requests and
maintain connection pool parameters."""
self._process_kill_cursors()
try:
self._process_kill_cursors()
self._topology.update_pool(self.__all_credentials)
except Exception:
helpers._handle_exception()
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
return
else:
helpers._handle_exception()

def __start_session(self, implicit, **kwargs):
# Raises ConfigurationError if sessions are not supported.
Expand Down
19 changes: 19 additions & 0 deletions test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,24 @@ def test_network_error_message(self):
with self.assertRaisesRegex(AutoReconnect, expected):
client.pymongo_test.test.find_one({})

@unittest.skipIf('PyPy' in sys.version, 'PYTHON-2938 could fail on PyPy')
def test_process_periodic_tasks(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this test will fail on PyPy because garbage collection works slightly differently. Let's skip this test entirely when running with PyPy (see my current PR for how to do that)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

client = rs_or_single_client()
coll = client.db.collection
coll.insert_many([{} for _ in range(5)])
cursor = coll.find(batch_size=2)
cursor.next()
c_id = cursor.cursor_id
self.assertIsNotNone(c_id)
client.close()
# Add cursor to kill cursors queue
del cursor
wait_until(lambda: client._MongoClient__kill_cursors_queue,
"waited for cursor to be added to queue")
client._process_periodic_tasks() # This must not raise or print any exceptions
with self.assertRaises(InvalidOperation):
coll.insert_many([{} for _ in range(5)])

@unittest.skipUnless(
_HAVE_DNSPYTHON, "DNS-related tests need dnspython to be installed")
def test_service_name_from_kwargs(self):
Expand All @@ -1613,6 +1631,7 @@ def test_service_name_from_kwargs(self):
'customname')



Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the try/except/fail here. Just call client._process_periodic_tasks(). If that raises unexpectedly the teat will fail automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

class TestExhaustCursor(IntegrationTest):
"""Test that clients properly handle errors from exhaust cursors."""

Expand Down