diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index d1794938f7..279eae5587 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1596,8 +1596,14 @@ def _process_kill_cursors(self): try: self._cleanup_cursor(True, cursor_id, address, sock_mgr, None, False) - except Exception: - helpers._handle_exception() + except Exception as exc: + if (isinstance(exc, InvalidOperation) + and self._topology._closed): + # Raise the exception when client is closed so that it + # can be caught in _process_periodic_tasks + raise + else: + helpers._handle_exception() # Don't re-open topology if it's closed and there's no pending cursors. if address_to_cursor_ids: @@ -1606,18 +1612,25 @@ def _process_kill_cursors(self): try: self._kill_cursors( cursor_ids, address, topology, session=None) - except Exception: - helpers._handle_exception() + except Exception as exc: + if (isinstance(exc, InvalidOperation) and + self._topology._closed): + raise + else: + helpers._handle_exception() # This method is run periodically by a background thread. def _process_periodic_tasks(self): """Process any pending kill cursors requests and maintain connection pool parameters.""" - self._process_kill_cursors() try: + self._process_kill_cursors() self._topology.update_pool(self.__all_credentials) - except Exception: - helpers._handle_exception() + except Exception as exc: + if isinstance(exc, InvalidOperation) and self._topology._closed: + return + else: + helpers._handle_exception() def __start_session(self, implicit, **kwargs): # Raises ConfigurationError if sessions are not supported. diff --git a/test/test_client.py b/test/test_client.py index 4d8b26400a..aee95692c6 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1591,6 +1591,24 @@ def test_network_error_message(self): with self.assertRaisesRegex(AutoReconnect, expected): client.pymongo_test.test.find_one({}) + @unittest.skipIf('PyPy' in sys.version, 'PYTHON-2938 could fail on PyPy') + def test_process_periodic_tasks(self): + client = rs_or_single_client() + coll = client.db.collection + coll.insert_many([{} for _ in range(5)]) + cursor = coll.find(batch_size=2) + cursor.next() + c_id = cursor.cursor_id + self.assertIsNotNone(c_id) + client.close() + # Add cursor to kill cursors queue + del cursor + wait_until(lambda: client._MongoClient__kill_cursors_queue, + "waited for cursor to be added to queue") + client._process_periodic_tasks() # This must not raise or print any exceptions + with self.assertRaises(InvalidOperation): + coll.insert_many([{} for _ in range(5)]) + @unittest.skipUnless( _HAVE_DNSPYTHON, "DNS-related tests need dnspython to be installed") def test_service_name_from_kwargs(self): @@ -1613,6 +1631,7 @@ def test_service_name_from_kwargs(self): 'customname') + class TestExhaustCursor(IntegrationTest): """Test that clients properly handle errors from exhaust cursors."""