Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions nats/src/nats/js/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,28 @@ def as_dict(self) -> Dict[str, object]:
return result


@dataclass
class StreamConsumerLimits(Base):
"""
StreamConsumerLimits are the limits for consumers on a stream.
These limits apply to newly created consumers and set default constraints.
Introduced in nats-server 2.10.0.
"""

inactive_threshold: Optional[float] = None # in seconds
max_ack_pending: Optional[int] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert_nanoseconds(resp, "inactive_threshold")
return super().from_response(resp)

def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
result["inactive_threshold"] = self._to_nanoseconds(self.inactive_threshold)
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The as_dict method has a bug: it unconditionally sets inactive_threshold even when it's None. The _to_nanoseconds method converts None to 0 (line 85), so when inactive_threshold is None, it will be sent as 0 to the server instead of being omitted.

This breaks the test case on line 4946 where only max_ack_pending is set - the server will receive inactive_threshold=0 instead of it being omitted.

The fix should conditionally set inactive_threshold only when it's not None:

def as_dict(self) -> Dict[str, object]:
    result = super().as_dict()
    if self.inactive_threshold is not None:
        result["inactive_threshold"] = self._to_nanoseconds(self.inactive_threshold)
    return result

This pattern is consistent with how other classes handle optional nanosecond fields (e.g., see ConsumerConfig.as_dict() lines 581-588).

Suggested change
result["inactive_threshold"] = self._to_nanoseconds(self.inactive_threshold)
if self.inactive_threshold is not None:
result["inactive_threshold"] = self._to_nanoseconds(self.inactive_threshold)

Copilot uses AI. Check for mistakes.
return result


@dataclass
class StreamConfig(Base):
"""
Expand Down Expand Up @@ -346,6 +368,9 @@ class StreamConfig(Base):
# Metadata are user defined string key/value pairs.
metadata: Optional[Dict[str, str]] = None

# Consumer limits for this stream. Introduced in nats-server 2.10.0.
consumer_limits: Optional[StreamConsumerLimits] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert_nanoseconds(resp, "max_age")
Expand All @@ -355,6 +380,7 @@ def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "sources", StreamSource)
cls._convert(resp, "republish", RePublish)
cls._convert(resp, "subject_transform", SubjectTransform)
cls._convert(resp, "consumer_limits", StreamConsumerLimits)
return super().from_response(resp)

def as_dict(self) -> Dict[str, object]:
Expand Down
59 changes: 59 additions & 0 deletions nats/tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -4902,6 +4902,65 @@ async def test_fetch_pull_subscribe_bind(self):

await nc.close()

@async_test
async def test_stream_consumer_limits(self):
nc = await nats.connect()

js = nc.jetstream()

# Create a stream with consumer_limits
consumer_limits = nats.js.api.StreamConsumerLimits(
inactive_threshold=3600.0, # 1 hour in seconds
max_ack_pending=1000,
)

await js.add_stream(
name="CONSUMERLIMITS",
subjects=["consumerlimits.test"],
consumer_limits=consumer_limits,
)

# Verify stream info returns the consumer_limits
sinfo = await js.stream_info("CONSUMERLIMITS")
assert sinfo.config.consumer_limits is not None
assert sinfo.config.consumer_limits.inactive_threshold == 3600.0
assert sinfo.config.consumer_limits.max_ack_pending == 1000

# Create a consumer and verify it respects the limits
# When consumer doesn't specify max_ack_pending, it should use stream's limit
await js.add_consumer(
"CONSUMERLIMITS",
config=nats.js.api.ConsumerConfig(
durable_name="consumer1",
),
)

cinfo = await js.consumer_info("CONSUMERLIMITS", "consumer1")
# The consumer should have inherited or be constrained by stream limits
assert cinfo.config.max_ack_pending is not None

# Test with only max_ack_pending set
await js.add_stream(
name="MAXACKONLY",
subjects=["consumerlimits.maxack"],
consumer_limits=nats.js.api.StreamConsumerLimits(max_ack_pending=500),
)
sinfo = await js.stream_info("MAXACKONLY")
assert sinfo.config.consumer_limits.max_ack_pending == 500
assert sinfo.config.consumer_limits.inactive_threshold is None

# Test with only inactive_threshold set
await js.add_stream(
name="INACTIVETHRESHOLDONLY",
subjects=["consumerlimits.inactive"],
consumer_limits=nats.js.api.StreamConsumerLimits(inactive_threshold=7200.0),
)
sinfo = await js.stream_info("INACTIVETHRESHOLDONLY")
assert sinfo.config.consumer_limits.inactive_threshold == 7200.0
assert sinfo.config.consumer_limits.max_ack_pending is None

await nc.close()


class BadStreamNamesTest(SingleJetStreamServerTestCase):
@async_test
Expand Down
Loading