Skip to content

Commit f78ac08

Browse files
committed
Expose subscription credit settings
Fixes #262
1 parent 5a38ee2 commit f78ac08

File tree

9 files changed

+242
-43
lines changed

9 files changed

+242
-43
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,18 @@ public interface ConsumerBuilder {
151151
*/
152152
ConsumerBuilder noTrackingStrategy();
153153

154+
/**
155+
* Define credit values for the subscription.
156+
*
157+
* <p>This is an experimental API, subject to change.
158+
*
159+
* @param initial the initial number of credits, default is 10
160+
* @param onChunkDelivery the number of credits provided on each chunk delivery, default is 1
161+
* @return this builder instance
162+
* @since 0.10.0
163+
*/
164+
ConsumerBuilder credits(int initial, int onChunkDelivery);
165+
154166
/**
155167
* Create the configured {@link Consumer}
156168
*

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ Runnable subscribe(
118118
SubscriptionListener subscriptionListener,
119119
Runnable trackingClosingCallback,
120120
MessageHandler messageHandler,
121-
Map<String, String> subscriptionProperties) {
121+
Map<String, String> subscriptionProperties,
122+
int initialCredits,
123+
int additionalCredits) {
122124
List<Client.Broker> candidates = findBrokersForStream(stream);
123125
Client.Broker newNode = pickBroker(candidates);
124126
if (newNode == null) {
@@ -137,7 +139,9 @@ Runnable subscribe(
137139
subscriptionListener,
138140
trackingClosingCallback,
139141
messageHandler,
140-
subscriptionProperties);
142+
subscriptionProperties,
143+
initialCredits,
144+
additionalCredits);
141145

142146
try {
143147
addToManager(newNode, subscriptionTracker, offsetSpecification, true);
@@ -387,6 +391,8 @@ private static class SubscriptionTracker {
387391
private volatile ClientSubscriptionsManager manager;
388392
private volatile AtomicReference<SubscriptionState> state =
389393
new AtomicReference<>(SubscriptionState.OPENING);
394+
private final int initialCredits;
395+
private final int additionalCredits;
390396

391397
private SubscriptionTracker(
392398
long id,
@@ -397,7 +403,9 @@ private SubscriptionTracker(
397403
SubscriptionListener subscriptionListener,
398404
Runnable trackingClosingCallback,
399405
MessageHandler messageHandler,
400-
Map<String, String> subscriptionProperties) {
406+
Map<String, String> subscriptionProperties,
407+
int initialCredits,
408+
int additionalCredits) {
401409
this.id = id;
402410
this.consumer = consumer;
403411
this.stream = stream;
@@ -406,6 +414,8 @@ private SubscriptionTracker(
406414
this.subscriptionListener = subscriptionListener;
407415
this.trackingClosingCallback = trackingClosingCallback;
408416
this.messageHandler = messageHandler;
417+
this.initialCredits = initialCredits;
418+
this.additionalCredits = additionalCredits;
409419
if (this.offsetTrackingReference == null) {
410420
this.subscriptionProperties = subscriptionProperties;
411421
} else {
@@ -548,7 +558,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
548558
SubscriptionTracker subscriptionTracker =
549559
subscriptionTrackers.get(subscriptionId & 0xFF);
550560
if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
551-
client.credit(subscriptionId, 1);
561+
client.credit(subscriptionId, subscriptionTracker.additionalCredits);
552562
} else {
553563
LOGGER.debug(
554564
"Could not find stream subscription {} or subscription closing, not providing credits",
@@ -943,7 +953,7 @@ synchronized void add(
943953
subId,
944954
subscriptionTracker.stream,
945955
subscriptionContext.offsetSpecification(),
946-
10,
956+
subscriptionTracker.initialCredits,
947957
subscriptionTracker.subscriptionProperties),
948958
RETRY_ON_TIMEOUT,
949959
"Subscribe request for consumer %s on stream '%s'",

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ class StreamConsumer implements Consumer {
8080
boolean lazyInit,
8181
SubscriptionListener subscriptionListener,
8282
Map<String, String> subscriptionProperties,
83-
ConsumerUpdateListener consumerUpdateListener) {
83+
ConsumerUpdateListener consumerUpdateListener,
84+
int initialCredits,
85+
int additionalCredits) {
8486

8587
this.id = ID_SEQUENCE.getAndIncrement();
8688
Runnable trackingClosingCallback;
@@ -253,7 +255,9 @@ class StreamConsumer implements Consumer {
253255
subscriptionListener,
254256
trackingClosingCallback,
255257
closedAwareMessageHandler,
256-
Collections.unmodifiableMap(subscriptionProperties));
258+
Collections.unmodifiableMap(subscriptionProperties),
259+
initialCredits,
260+
additionalCredits);
257261

258262
this.status = Status.RUNNING;
259263
};

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class StreamConsumerBuilder implements ConsumerBuilder {
4242
private SubscriptionListener subscriptionListener = subscriptionContext -> {};
4343
private Map<String, String> subscriptionProperties = new ConcurrentHashMap<>();
4444
private ConsumerUpdateListener consumerUpdateListener;
45+
private int initialCredits = 1;
46+
private int additionalCredits = 1;
4547

4648
public StreamConsumerBuilder(StreamEnvironment environment) {
4749
this.environment = environment;
@@ -130,6 +132,16 @@ public ConsumerBuilder noTrackingStrategy() {
130132
return this;
131133
}
132134

135+
@Override
136+
public ConsumerBuilder credits(int initial, int onChunkDelivery) {
137+
if (initial <= 0 || onChunkDelivery <= 0) {
138+
throw new IllegalArgumentException("Credits must be positive");
139+
}
140+
this.initialCredits = initial;
141+
this.additionalCredits = onChunkDelivery;
142+
return this;
143+
}
144+
133145
StreamConsumerBuilder lazyInit(boolean lazyInit) {
134146
this.lazyInit = lazyInit;
135147
return this;
@@ -192,7 +204,9 @@ public Consumer build() {
192204
this.lazyInit,
193205
this.subscriptionListener,
194206
this.subscriptionProperties,
195-
this.consumerUpdateListener);
207+
this.consumerUpdateListener,
208+
this.initialCredits,
209+
this.additionalCredits);
196210
environment.addConsumer((StreamConsumer) consumer);
197211
} else {
198212
if (Utils.isSac(this.subscriptionProperties)) {

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,9 @@ Runnable registerConsumer(
659659
SubscriptionListener subscriptionListener,
660660
Runnable trackingClosingCallback,
661661
MessageHandler messageHandler,
662-
Map<String, String> subscriptionProperties) {
662+
Map<String, String> subscriptionProperties,
663+
int initialCredits,
664+
int additionalCredits) {
663665
Runnable closingCallback =
664666
this.consumersCoordinator.subscribe(
665667
consumer,
@@ -669,7 +671,9 @@ Runnable registerConsumer(
669671
subscriptionListener,
670672
trackingClosingCallback,
671673
messageHandler,
672-
subscriptionProperties);
674+
subscriptionProperties,
675+
initialCredits,
676+
additionalCredits);
673677
return closingCallback;
674678
}
675679

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.rabbitmq.stream.impl.Client;
4848
import com.rabbitmq.stream.metrics.MetricsCollector;
4949
import com.rabbitmq.stream.perf.ShutdownService.CloseCallback;
50+
import com.rabbitmq.stream.perf.Utils.CreditSettings;
5051
import com.rabbitmq.stream.perf.Utils.NamedThreadFactory;
5152
import com.rabbitmq.stream.perf.Utils.PerformanceMicrometerMetricsCollector;
5253
import io.micrometer.core.instrument.Counter;
@@ -425,6 +426,13 @@ public void setMaxSegmentSize(ByteCapacity in) {
425426
defaultValue = "false")
426427
private boolean metricsCommandLineArguments;
427428

429+
@CommandLine.Option(
430+
names = {"--credits", "-cr"},
431+
description = "initial and additional credits for subscriptions",
432+
defaultValue = "10:1",
433+
converter = Utils.CreditsTypeConverter.class)
434+
private CreditSettings credits;
435+
428436
private MetricsCollector metricsCollector;
429437
private PerformanceMetrics performanceMetrics;
430438
private List<Monitoring> monitorings;
@@ -883,8 +891,11 @@ public Integer call() throws Exception {
883891

884892
AtomicLong messageCount = new AtomicLong(0);
885893
String stream = stream(streams, i);
886-
ConsumerBuilder consumerBuilder = environment.consumerBuilder();
887-
consumerBuilder = consumerBuilder.offset(this.offset);
894+
ConsumerBuilder consumerBuilder =
895+
environment
896+
.consumerBuilder()
897+
.offset(this.offset)
898+
.credits(this.credits.initial(), this.credits.additional());
888899

889900
if (this.superStreams) {
890901
consumerBuilder.superStream(stream);

src/main/java/com/rabbitmq/stream/perf/Utils.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,69 @@ public Integer convert(String input) {
647647
}
648648
}
649649

650+
static class CreditsTypeConverter implements CommandLine.ITypeConverter<CreditSettings> {
651+
652+
@Override
653+
public CreditSettings convert(String input) {
654+
String errorMessage =
655+
input + " is not a valid credits setting, " + "valid example values: 20:1, 15";
656+
if (input == null || input.trim().isEmpty()) {
657+
typeConversionException(errorMessage);
658+
}
659+
if (input.contains(":") || input.contains("-")) {
660+
String separator = input.contains(":") ? ":" : "-";
661+
String[] split = input.split(separator);
662+
if (split.length != 2) {
663+
typeConversionException(errorMessage);
664+
} else {
665+
int[] credits =
666+
Arrays.stream(split)
667+
.mapToInt(Integer::valueOf)
668+
.peek(
669+
c -> {
670+
if (c <= 0) {
671+
typeConversionException("credit values must be positive");
672+
}
673+
})
674+
.toArray();
675+
return new CreditSettings(credits[0], credits[1]);
676+
}
677+
}
678+
try {
679+
int value = Integer.parseInt(input);
680+
if (value <= 0) {
681+
typeConversionException("credit values must be positive");
682+
}
683+
return new CreditSettings(value, 1);
684+
} catch (Exception e) {
685+
typeConversionException(errorMessage);
686+
}
687+
return new CreditSettings(10, 1);
688+
}
689+
}
690+
691+
static class CreditSettings {
692+
693+
private final int initial, additional;
694+
695+
CreditSettings(int initial, int additional) {
696+
this.initial = initial;
697+
this.additional = additional;
698+
}
699+
700+
int initial() {
701+
return this.initial;
702+
}
703+
704+
int additional() {
705+
return this.additional;
706+
}
707+
}
708+
709+
private static void typeConversionException(String message) {
710+
throw new TypeConversionException(message);
711+
}
712+
650713
static class CompressionTypeConverter implements CommandLine.ITypeConverter<Compression> {
651714

652715
@Override

0 commit comments

Comments
 (0)