From 0c7f8702300f51b32c4ca3684ac6a60e55a025ca Mon Sep 17 00:00:00 2001 From: Yorick van Pelt Date: Wed, 7 May 2025 11:57:02 +0200 Subject: [PATCH 1/3] Fix: PineconeGrpcFuture blocks during construction When using grpc with async_req, the construction of a PineconeGrpcFuture would call _sync_state, which would do a blocking call to grpc_future.exception(...). This means that the async_reqs were all blocking in practice. We can fix it by checking if the future is still running and not doing any blocking calls when it is. --- pinecone/grpc/future.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pinecone/grpc/future.py b/pinecone/grpc/future.py index d98e1e84..2a4ae7c6 100644 --- a/pinecone/grpc/future.py +++ b/pinecone/grpc/future.py @@ -30,18 +30,16 @@ def _sync_state(self, grpc_future): if self.done(): return - if grpc_future.cancelled(): + if grpc_future.running(): + self.set_running_or_notify_cancel() + elif grpc_future.cancelled(): self.cancel() - elif grpc_future.exception(timeout=self._default_timeout): - self.set_exception(grpc_future.exception()) elif grpc_future.done(): try: result = grpc_future.result(timeout=self._default_timeout) self.set_result(result) except Exception as e: self.set_exception(e) - elif grpc_future.running(): - self.set_running_or_notify_cancel() def set_result(self, result): if self._result_transformer: From fbb76d190bd0f6ea26f042c792a80c1be26dfdbe Mon Sep 17 00:00:00 2001 From: Yorick van Pelt Date: Mon, 19 May 2025 15:34:12 +0200 Subject: [PATCH 2/3] Attempt to cancel future if applicable, deal with multiple _sync_state calls --- pinecone/grpc/future.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pinecone/grpc/future.py b/pinecone/grpc/future.py index 2a4ae7c6..14fe9cf0 100644 --- a/pinecone/grpc/future.py +++ b/pinecone/grpc/future.py @@ -30,8 +30,9 @@ def _sync_state(self, grpc_future): if self.done(): return - if grpc_future.running(): - self.set_running_or_notify_cancel() + if grpc_future.running() and not self.running(): + if not self.set_running_or_notify_cancel(): + grpc_future.cancel() elif grpc_future.cancelled(): self.cancel() elif grpc_future.done(): From 15e3fa1dd6b77721c6302a479f526fa4bbdae95c Mon Sep 17 00:00:00 2001 From: Yorick van Pelt Date: Mon, 19 May 2025 15:34:39 +0200 Subject: [PATCH 3/3] grpc unit test: throw exception from result mock --- tests/unit_grpc/test_futures.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/unit_grpc/test_futures.py b/tests/unit_grpc/test_futures.py index 1f8e70c0..43960b00 100644 --- a/tests/unit_grpc/test_futures.py +++ b/tests/unit_grpc/test_futures.py @@ -15,6 +15,8 @@ def mock_grpc_future( grpc_future.exception.return_value = exception grpc_future.running.return_value = running grpc_future.result.return_value = result + if exception: + grpc_future.result.side_effect = exception return grpc_future @@ -309,6 +311,7 @@ def test_result_catch_grpc_exceptions(self, mocker): def test_exception_when_done_maps_grpc_exception(self, mocker): grpc_future = mock_grpc_future(mocker, done=True) grpc_future.exception.return_value = FakeGrpcError(mocker) + grpc_future.result.side_effect = grpc_future.exception.return_value future = PineconeGrpcFuture(grpc_future) @@ -467,6 +470,7 @@ def test_concurrent_futures_wait_first_exception(self, mocker): grpc_future2 = mock_grpc_future(mocker, done=True) grpc_future2.exception.return_value = Exception("Simulated gRPC error") + grpc_future2.result.side_effect = grpc_future2.exception.return_value future2 = PineconeGrpcFuture(grpc_future2) from concurrent.futures import wait, FIRST_EXCEPTION