Skip to content

Commit 35c1b0d

Browse files
committed
Test partition going back to original consumer
No new messages could arrive because of credit starvation, as credit providing request were not taking into account for inactive subscription. This tests a server-side fix. The test is disabled for now. References rabbitmq/rabbitmq-server#7638
1 parent 8e7ed02 commit 35c1b0d

File tree

2 files changed

+75
-9
lines changed

2 files changed

+75
-9
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -579,11 +579,17 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
579579
};
580580

581581
CreditNotification creditNotification =
582-
(subscriptionId, responseCode) ->
583-
LOGGER.debug(
584-
"Received credit notification for subscription {}: {}",
585-
subscriptionId & 0xFF,
586-
Utils.formatConstant(responseCode));
582+
(subscriptionId, responseCode) -> {
583+
SubscriptionTracker subscriptionTracker =
584+
subscriptionTrackers.get(subscriptionId & 0xFF);
585+
String stream = subscriptionTracker == null ? "?" : subscriptionTracker.stream;
586+
LOGGER.debug(
587+
"Received credit notification for subscription {} (stream '{}'): {}",
588+
subscriptionId & 0xFF,
589+
stream,
590+
Utils.formatConstant(responseCode));
591+
};
592+
587593
MessageListener messageListener =
588594
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
589595
SubscriptionTracker subscriptionTracker =

src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,13 @@
3535
import java.time.Duration;
3636
import java.util.Collections;
3737
import java.util.List;
38+
import java.util.Set;
3839
import java.util.concurrent.ConcurrentHashMap;
3940
import java.util.concurrent.ConcurrentMap;
4041
import java.util.concurrent.CountDownLatch;
4142
import java.util.concurrent.atomic.AtomicInteger;
42-
import org.junit.jupiter.api.AfterEach;
43-
import org.junit.jupiter.api.BeforeEach;
44-
import org.junit.jupiter.api.Test;
45-
import org.junit.jupiter.api.TestInfo;
43+
import java.util.concurrent.atomic.AtomicReference;
44+
import org.junit.jupiter.api.*;
4645
import org.junit.jupiter.api.extension.ExtendWith;
4746

4847
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
@@ -300,4 +299,65 @@ void autoOffsetTrackingShouldStoreOffsetZero() throws Exception {
300299
});
301300
}));
302301
}
302+
303+
@Test
304+
@Disabled
305+
void rebalancedPartitionShouldGetMessagesWhenItComesBackToOriginalConsumerInstance()
306+
throws Exception {
307+
declareSuperStreamTopology(connection, superStream, partitionCount);
308+
Client client = cf.get();
309+
List<String> partitions = client.partitions(superStream);
310+
int messageCount = 10_000;
311+
publishToPartitions(cf, partitions, messageCount);
312+
String consumerName = "my-app";
313+
Set<String> receivedPartitions = ConcurrentHashMap.newKeySet(partitionCount);
314+
Runnable processing =
315+
() -> {
316+
try {
317+
Thread.sleep(10);
318+
} catch (InterruptedException e) {
319+
// OK
320+
}
321+
};
322+
Consumer consumer1 =
323+
environment
324+
.consumerBuilder()
325+
.superStream(superStream)
326+
.singleActiveConsumer()
327+
.offset(OffsetSpecification.first())
328+
.name(consumerName)
329+
.autoTrackingStrategy()
330+
.messageCountBeforeStorage(messageCount / partitionCount / 50)
331+
.builder()
332+
.messageHandler(
333+
(context, message) -> {
334+
receivedPartitions.add(context.stream());
335+
processing.run();
336+
})
337+
.build();
338+
waitAtMost(() -> receivedPartitions.size() == partitions.size());
339+
340+
AtomicReference<String> partition = new AtomicReference<>();
341+
Consumer consumer2 =
342+
environment
343+
.consumerBuilder()
344+
.superStream(superStream)
345+
.singleActiveConsumer()
346+
.offset(OffsetSpecification.first())
347+
.name(consumerName)
348+
.autoTrackingStrategy()
349+
.messageCountBeforeStorage(messageCount / partitionCount / 50)
350+
.builder()
351+
.messageHandler(
352+
(context, message) -> {
353+
partition.set(context.stream());
354+
processing.run();
355+
})
356+
.build();
357+
waitAtMost(() -> partition.get() != null);
358+
consumer2.close();
359+
receivedPartitions.clear();
360+
waitAtMost(() -> receivedPartitions.size() == partitions.size());
361+
consumer1.close();
362+
}
303363
}

0 commit comments

Comments
 (0)