Skip to content

Commit 40f70bf

Browse files
oliverlambsoncaspervonb
authored andcommitted
Add consumer-configured inbox_prefix use to jetstream pull_subscribe methods (#781)
1 parent 8f8736c commit 40f70bf

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed

nats/src/nats/js/client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ async def pull_subscribe(
545545
config: Optional[api.ConsumerConfig] = None,
546546
pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
547547
pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
548-
inbox_prefix: bytes = api.INBOX_PREFIX,
548+
inbox_prefix: Optional[bytes] = None,
549549
) -> JetStreamContext.PullSubscription:
550550
"""Create consumer and pull subscription.
551551
@@ -620,7 +620,7 @@ async def pull_subscribe_bind(
620620
self,
621621
consumer: Optional[str] = None,
622622
stream: Optional[str] = None,
623-
inbox_prefix: bytes = api.INBOX_PREFIX,
623+
inbox_prefix: Optional[bytes] = None,
624624
pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
625625
pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
626626
name: Optional[str] = None,
@@ -654,6 +654,10 @@ async def main():
654654
"""
655655
if not stream:
656656
raise ValueError("nats: stream name is required")
657+
658+
if inbox_prefix is None:
659+
inbox_prefix = bytes(self._nc._inbox_prefix[:]) + b"."
660+
657661
deliver = inbox_prefix + self._nc._nuid.next()
658662
sub = await self._nc.subscribe(
659663
deliver.decode(),

nats/tests/test_js.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,42 @@ async def test_add_pull_consumer_via_jsm(self):
406406
info = await js.consumer_info("events", "a")
407407
assert 0 == info.num_pending
408408

409+
@async_test
410+
async def test_pull_subscribe_bind_custom_inbox_prefix(self):
411+
"""Test that pull_subscribe_bind respects custom inbox_prefix from connection."""
412+
nc = NATS()
413+
await nc.connect(inbox_prefix="_INBOX_custom")
414+
415+
js = nc.jetstream()
416+
417+
# Create stream and consumer
418+
await js.add_stream(name="events", subjects=["events.test"])
419+
await js.add_consumer(
420+
"events",
421+
durable_name="test_consumer",
422+
deliver_policy=nats.js.api.DeliverPolicy.ALL,
423+
filter_subject="events.test",
424+
)
425+
426+
# Publish a message
427+
await js.publish("events.test", b"hello")
428+
429+
# pull_subscribe_bind should use the custom inbox prefix by default
430+
sub = await js.pull_subscribe_bind("test_consumer", stream="events")
431+
432+
# Verify the deliver subject uses the custom prefix
433+
assert sub._deliver.startswith("_INBOX_custom."), (
434+
f"Expected deliver subject to start with '_INBOX_custom.' but got: {sub._deliver}"
435+
)
436+
437+
# Verify functionality still works
438+
msgs = await sub.fetch(1)
439+
assert len(msgs) == 1
440+
assert msgs[0].data == b"hello"
441+
await msgs[0].ack()
442+
443+
await nc.close()
444+
409445
@async_long_test
410446
async def test_fetch_n(self):
411447
nc = NATS()

0 commit comments

Comments
 (0)