diff --git a/nats/src/nats/js/api.py b/nats/src/nats/js/api.py index a11959f48..9f3bf76e9 100644 --- a/nats/src/nats/js/api.py +++ b/nats/src/nats/js/api.py @@ -492,6 +492,11 @@ class ConsumerConfig(Base): # Metadata are user defined string key/value pairs. metadata: Optional[Dict[str, str]] = None + # Consumer pause until timestamp. + # Temporarily suspend message delivery until the specified time (RFC 3339 format). + # Introduced in nats-server 2.11.0. + pause_until: Optional[str] = None + @classmethod def from_response(cls, resp: Dict[str, Any]): cls._convert_nanoseconds(resp, "ack_wait") @@ -538,6 +543,12 @@ class ConsumerInfo(Base): num_pending: Optional[int] = None cluster: Optional[ClusterInfo] = None push_bound: Optional[bool] = None + # Indicates if the consumer is currently paused. + # Introduced in nats-server 2.11.0. + paused: Optional[bool] = None + # RFC 3339 timestamp until which the consumer is paused. + # Introduced in nats-server 2.11.0. + pause_remaining: Optional[str] = None @classmethod def from_response(cls, resp: Dict[str, Any]): @@ -548,6 +559,18 @@ def from_response(cls, resp: Dict[str, Any]): return super().from_response(resp) +@dataclass +class ConsumerPause(Base): + """ + ConsumerPause represents the pause state after a pause or resume operation. + Introduced in nats-server 2.11.0. + """ + + paused: bool + pause_until: Optional[str] = None + pause_remaining: Optional[str] = None + + @dataclass class AccountLimits(Base): """Account limits diff --git a/nats/src/nats/js/manager.py b/nats/src/nats/js/manager.py index 33f29170c..59f6e5f86 100644 --- a/nats/src/nats/js/manager.py +++ b/nats/src/nats/js/manager.py @@ -235,6 +235,67 @@ async def delete_consumer(self, stream: str, consumer: str) -> bool: ) return resp["success"] + async def pause_consumer( + self, + stream: str, + consumer: str, + pause_until: str, + timeout: Optional[float] = None, + ) -> api.ConsumerPause: + """ + Pause a consumer until the specified time. + + Args: + stream: The stream name + consumer: The consumer name + pause_until: RFC 3339 timestamp string (e.g., "2025-10-22T12:00:00Z") + until which the consumer should be paused + timeout: Request timeout in seconds + + Returns: + ConsumerPause with paused status + + Note: + Requires nats-server 2.11.0 or later + """ + if timeout is None: + timeout = self._timeout + + req = {"pause_until": pause_until} + req_data = json.dumps(req).encode() + + resp = await self._api_request( + f"{self._prefix}.CONSUMER.PAUSE.{stream}.{consumer}", + req_data, + timeout=timeout, + ) + return api.ConsumerPause.from_response(resp) + + async def resume_consumer( + self, + stream: str, + consumer: str, + timeout: Optional[float] = None, + ) -> api.ConsumerPause: + """ + Resume a paused consumer immediately. + + This is equivalent to calling pause_consumer with a timestamp in the past. + + Args: + stream: The stream name + consumer: The consumer name + timeout: Request timeout in seconds + + Returns: + ConsumerPause with paused=False + + Note: + Requires nats-server 2.11.0 or later + """ + # Resume by pausing until a time in the past (epoch) + return await self.pause_consumer(stream, consumer, "1970-01-01T00:00:00Z", timeout) + async def consumers_info(self, stream: str, offset: Optional[int] = None) -> List[api.ConsumerInfo]: """ consumers_info retrieves a list of consumers. Consumers list limit is 256 for more diff --git a/nats/tests/test_js.py b/nats/tests/test_js.py index 10a88cca4..f27cab521 100644 --- a/nats/tests/test_js.py +++ b/nats/tests/test_js.py @@ -1503,6 +1503,137 @@ async def test_jsm_stream_info_options(self): assert si.state.subjects == None +class ConsumerPauseResumeTest(SingleJetStreamServerTestCase): + @async_test + async def test_consumer_pause_and_resume(self): + """Test pausing and resuming a consumer""" + nc = NATS() + await nc.connect() + + server_version = nc.connected_server_version + if server_version.major == 2 and server_version.minor < 11: + pytest.skip("consumer pause/resume requires nats-server v2.11.0 or later") + + js = nc.jetstream() + jsm = nc.jsm() + + # Create a stream + await jsm.add_stream(name="PAUSETEST", subjects=["pause.test"]) + + # Publish some messages + for i in range(5): + await js.publish("pause.test", f"msg-{i}".encode()) + + # Create a pull consumer + consumer_name = "pause-consumer" + await jsm.add_consumer( + "PAUSETEST", + name=consumer_name, + durable_name=consumer_name, + ack_policy="explicit", + ) + + # Get initial consumer info - may or may not be paused initially + # (we'll test pausing anyway) + initial_cinfo = await jsm.consumer_info("PAUSETEST", consumer_name) + + # Pause the consumer until a future time (1 hour from now) + from datetime import datetime, timedelta, timezone + + pause_until = (datetime.now(timezone.utc) + timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ") + + pause_resp = await jsm.pause_consumer("PAUSETEST", consumer_name, pause_until) + assert pause_resp.paused is True + assert pause_resp.pause_remaining is not None + + # Verify consumer is still paused when we check info + cinfo = await jsm.consumer_info("PAUSETEST", consumer_name) + assert cinfo.paused is True + + # Resume the consumer + resume_resp = await jsm.resume_consumer("PAUSETEST", consumer_name) + assert resume_resp.paused is False + + # Verify consumer can now receive messages + sub = await js.pull_subscribe_bind(consumer_name, "PAUSETEST") + msgs = await sub.fetch(1, timeout=2) + assert len(msgs) == 1 + # Message should be one of our published messages + assert msgs[0].data in [b"msg-0", b"msg-1", b"msg-2", b"msg-3", b"msg-4"] + await msgs[0].ack() + + await nc.close() + + @async_test + async def test_consumer_pause_until_in_config(self): + """Test creating a consumer with pause_until in config""" + nc = NATS() + await nc.connect() + + server_version = nc.connected_server_version + if server_version.major == 2 and server_version.minor < 11: + pytest.skip("consumer pause/resume requires nats-server v2.11.0 or later") + + js = nc.jetstream() + jsm = nc.jsm() + + # Create a stream + await jsm.add_stream(name="PAUSECONFIG", subjects=["pause.config"]) + + # Publish a message + await js.publish("pause.config", b"test message") + + # Create a consumer with pause_until in the config + from datetime import datetime, timedelta, timezone + + pause_until = (datetime.now(timezone.utc) + timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ") + + consumer_config = nats.js.api.ConsumerConfig( + name="paused-consumer", + durable_name="paused-consumer", + ack_policy="explicit", + pause_until=pause_until, + ) + + cinfo = await jsm.add_consumer("PAUSECONFIG", config=consumer_config) + assert cinfo.paused is True + # The server may round or adjust the pause_until time slightly + assert cinfo.config.pause_until is not None + + await nc.close() + + @async_test + async def test_consumer_pause_with_immediate_expiry(self): + """Test pausing a consumer with an immediate expiry (effectively resume)""" + nc = NATS() + await nc.connect() + + server_version = nc.connected_server_version + if server_version.major == 2 and server_version.minor < 11: + pytest.skip("consumer pause/resume requires nats-server v2.11.0 or later") + + js = nc.jetstream() + jsm = nc.jsm() + + # Create a stream + await jsm.add_stream(name="PAUSEIMMEDIATE", subjects=["pause.immediate"]) + + # Create a consumer + consumer_name = "immediate-consumer" + await jsm.add_consumer( + "PAUSEIMMEDIATE", + name=consumer_name, + durable_name=consumer_name, + ack_policy="explicit", + ) + + # Pause with a time in the past (epoch) - should effectively resume + resume_resp = await jsm.pause_consumer("PAUSEIMMEDIATE", consumer_name, "1970-01-01T00:00:00Z") + assert resume_resp.paused is False + + await nc.close() + + class SubscribeTest(SingleJetStreamServerTestCase): @async_test async def test_queue_subscribe_deliver_group(self):