diff --git a/nats/src/nats/js/api.py b/nats/src/nats/js/api.py index cdd254db..170839a6 100644 --- a/nats/src/nats/js/api.py +++ b/nats/src/nats/js/api.py @@ -283,6 +283,28 @@ def as_dict(self) -> Dict[str, object]: return result +@dataclass +class StreamConsumerLimits(Base): + """ + StreamConsumerLimits are the limits for consumers on a stream. + These limits apply to newly created consumers and set default constraints. + Introduced in nats-server 2.10.0. + """ + + inactive_threshold: Optional[float] = None # in seconds + max_ack_pending: Optional[int] = None + + @classmethod + def from_response(cls, resp: Dict[str, Any]): + cls._convert_nanoseconds(resp, "inactive_threshold") + return super().from_response(resp) + + def as_dict(self) -> Dict[str, object]: + result = super().as_dict() + result["inactive_threshold"] = self._to_nanoseconds(self.inactive_threshold) + return result + + @dataclass class StreamConfig(Base): """ @@ -346,6 +368,9 @@ class StreamConfig(Base): # Metadata are user defined string key/value pairs. metadata: Optional[Dict[str, str]] = None + # Consumer limits for this stream. Introduced in nats-server 2.10.0. + consumer_limits: Optional[StreamConsumerLimits] = None + @classmethod def from_response(cls, resp: Dict[str, Any]): cls._convert_nanoseconds(resp, "max_age") @@ -355,6 +380,7 @@ def from_response(cls, resp: Dict[str, Any]): cls._convert(resp, "sources", StreamSource) cls._convert(resp, "republish", RePublish) cls._convert(resp, "subject_transform", SubjectTransform) + cls._convert(resp, "consumer_limits", StreamConsumerLimits) return super().from_response(resp) def as_dict(self) -> Dict[str, object]: diff --git a/nats/tests/test_js.py b/nats/tests/test_js.py index 1b51edd5..879c3a06 100644 --- a/nats/tests/test_js.py +++ b/nats/tests/test_js.py @@ -4902,6 +4902,65 @@ async def test_fetch_pull_subscribe_bind(self): await nc.close() + @async_test + async def test_stream_consumer_limits(self): + nc = await nats.connect() + + js = nc.jetstream() + + # Create a stream with consumer_limits + consumer_limits = nats.js.api.StreamConsumerLimits( + inactive_threshold=3600.0, # 1 hour in seconds + max_ack_pending=1000, + ) + + await js.add_stream( + name="CONSUMERLIMITS", + subjects=["consumerlimits.test"], + consumer_limits=consumer_limits, + ) + + # Verify stream info returns the consumer_limits + sinfo = await js.stream_info("CONSUMERLIMITS") + assert sinfo.config.consumer_limits is not None + assert sinfo.config.consumer_limits.inactive_threshold == 3600.0 + assert sinfo.config.consumer_limits.max_ack_pending == 1000 + + # Create a consumer and verify it respects the limits + # When consumer doesn't specify max_ack_pending, it should use stream's limit + await js.add_consumer( + "CONSUMERLIMITS", + config=nats.js.api.ConsumerConfig( + durable_name="consumer1", + ), + ) + + cinfo = await js.consumer_info("CONSUMERLIMITS", "consumer1") + # The consumer should have inherited or be constrained by stream limits + assert cinfo.config.max_ack_pending is not None + + # Test with only max_ack_pending set + await js.add_stream( + name="MAXACKONLY", + subjects=["consumerlimits.maxack"], + consumer_limits=nats.js.api.StreamConsumerLimits(max_ack_pending=500), + ) + sinfo = await js.stream_info("MAXACKONLY") + assert sinfo.config.consumer_limits.max_ack_pending == 500 + assert sinfo.config.consumer_limits.inactive_threshold is None + + # Test with only inactive_threshold set + await js.add_stream( + name="INACTIVETHRESHOLDONLY", + subjects=["consumerlimits.inactive"], + consumer_limits=nats.js.api.StreamConsumerLimits(inactive_threshold=7200.0), + ) + sinfo = await js.stream_info("INACTIVETHRESHOLDONLY") + assert sinfo.config.consumer_limits.inactive_threshold == 7200.0 + assert sinfo.config.consumer_limits.max_ack_pending is None + + await nc.close() + class BadStreamNamesTest(SingleJetStreamServerTestCase): @async_test